]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
zstdmt : can compress at block granularity
authorYann Collet <cyan@fb.com>
Sat, 13 Jan 2018 21:18:57 +0000 (13:18 -0800)
committerYann Collet <cyan@fb.com>
Sat, 13 Jan 2018 21:18:57 +0000 (13:18 -0800)
offering perspective of more accurate progression report.

lib/compress/zstdmt_compress.c
programs/fileio.c

index e51edf124f8556a6d6a09a470918f31b5a8cc16a..3a5b58a72ce9920295f83b745824d7477aec60b9 100644 (file)
@@ -22,6 +22,7 @@
 
 /* ======   Dependencies   ====== */
 #include <string.h>      /* memcpy, memset */
+#include <limits.h>      /* INT_MAX */
 #include "pool.h"        /* threadpool */
 #include "threading.h"   /* mutex */
 #include "zstd_compress_internal.h"  /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
@@ -129,7 +130,7 @@ static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool* bufPool)
 static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool* bufPool)
 {
     size_t const poolSize = sizeof(*bufPool)
-                            + (bufPool->totalBuffers - 1) * sizeof(buffer_t);
+                          + (bufPool->totalBuffers - 1) * sizeof(buffer_t);
     unsigned u;
     size_t totalBufferSize = 0;
     ZSTD_pthread_mutex_lock(&bufPool->poolMutex);
@@ -201,20 +202,6 @@ static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool* bufPool, buffer_t buf)
     ZSTD_free(buf.start, bufPool->cMem);
 }
 
-/* Sets parameters relevant to the compression job, initializing others to
- * default values. Notably, nbThreads should probably be zero. */
-static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params)
-{
-    ZSTD_CCtx_params jobParams;
-    memset(&jobParams, 0, sizeof(jobParams));
-
-    jobParams.cParams = params.cParams;
-    jobParams.fParams = params.fParams;
-    jobParams.compressionLevel = params.compressionLevel;
-
-    jobParams.ldmParams = params.ldmParams;
-    return jobParams;
-}
 
 /* =====   CCtx Pool   ===== */
 /* a single CCtx Pool can be invoked from multiple threads in parallel */
@@ -305,13 +292,16 @@ static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool* pool, ZSTD_CCtx* cctx)
 }
 
 
-/* =====   Thread worker   ===== */
+/* ------------------------------------------ */
+/* =====          Thread worker         ===== */
+/* ------------------------------------------ */
 
 typedef struct {
     buffer_t src;
     const void* srcStart;
     size_t   prefixSize;
     size_t   srcSize;
+    size_t   readSize;
     buffer_t dstBuff;
     size_t   cSize;
     size_t   dstFlushed;
@@ -328,21 +318,19 @@ typedef struct {
     unsigned long long fullFrameSize;
 } ZSTDMT_jobDescription;
 
-/* ZSTDMT_compressChunk() : POOL_function type */
+/* ZSTDMT_compressChunk() is a POOL_function type */
 void ZSTDMT_compressChunk(void* jobDescription)
 {
     ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
     ZSTD_CCtx* const cctx = ZSTDMT_getCCtx(job->cctxPool);
     const void* const src = (const char*)job->srcStart + job->prefixSize;
     buffer_t dstBuff = job->dstBuff;
-    DEBUGLOG(5, "ZSTDMT_compressChunk: job (first:%u) (last:%u) : prefixSize %u, srcSize %u ",
-                 job->firstChunk, job->lastChunk, (U32)job->prefixSize, (U32)job->srcSize);
 
+    /* ressources */
     if (cctx==NULL) {
         job->cSize = ERROR(memory_allocation);
         goto _endJob;
     }
-
     if (dstBuff.start == NULL) {
         dstBuff = ZSTDMT_getBuffer(job->bufPool);
         if (dstBuff.start==NULL) {
@@ -350,30 +338,26 @@ void ZSTDMT_compressChunk(void* jobDescription)
             goto _endJob;
         }
         job->dstBuff = dstBuff;
-        DEBUGLOG(5, "ZSTDMT_compressChunk: received dstBuff of size %u", (U32)dstBuff.size);
     }
 
+    /* init */
     if (job->cdict) {
         size_t const initError = ZSTD_compressBegin_advanced_internal(cctx, NULL, 0, ZSTD_dm_auto, job->cdict, job->params, job->fullFrameSize);
-        DEBUGLOG(4, "ZSTDMT_compressChunk: init using CDict (windowLog=%u)", job->params.cParams.windowLog);
         assert(job->firstChunk);  /* only allowed for first job */
         if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
     } else {  /* srcStart points at reloaded section */
         U64 const pledgedSrcSize = job->firstChunk ? job->fullFrameSize : ZSTD_CONTENTSIZE_UNKNOWN;
         ZSTD_CCtx_params jobParams = job->params;   /* do not modify job->params ! copy it, modify the copy */
-        size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
-        if (ZSTD_isError(forceWindowError)) {
-            DEBUGLOG(5, "ZSTD_CCtxParam_setParameter error : %s ", ZSTD_getErrorName(forceWindowError));
-            job->cSize = forceWindowError;
-            goto _endJob;
-        }
-        DEBUGLOG(5, "ZSTDMT_compressChunk: invoking ZSTD_compressBegin_advanced_internal with windowLog = %u ", jobParams.cParams.windowLog);
+        {   size_t const forceWindowError = ZSTD_CCtxParam_setParameter(&jobParams, ZSTD_p_forceMaxWindow, !job->firstChunk);
+            if (ZSTD_isError(forceWindowError)) {
+                job->cSize = forceWindowError;
+                goto _endJob;
+        }   }
         {   size_t const initError = ZSTD_compressBegin_advanced_internal(cctx,
                                         job->srcStart, job->prefixSize, ZSTD_dm_rawContent, /* load dictionary in "content-only" mode (no header analysis) */
                                         NULL,
                                         jobParams, pledgedSrcSize);
             if (ZSTD_isError(initError)) {
-                DEBUGLOG(5, "ZSTD_compressBegin_advanced_internal error : %s ", ZSTD_getErrorName(initError));
                 job->cSize = initError;
                 goto _endJob;
         }   }
@@ -384,19 +368,50 @@ void ZSTDMT_compressChunk(void* jobDescription)
         ZSTD_invalidateRepCodes(cctx);
     }
 
-    DEBUGLOG(5, "Compressing into dstBuff of size %u", (U32)dstBuff.size);
-    DEBUG_PRINTHEX(6, job->srcStart, 12);
+    /* compress */
+#if 1
     job->cSize = (job->lastChunk) ?
                  ZSTD_compressEnd     (cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
                  ZSTD_compressContinue(cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
-    DEBUGLOG(5, "compressed %u bytes into %u bytes   (first:%u) (last:%u) ",
-                (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
-    DEBUGLOG(5, "dstBuff.size : %u ; => %s ", (U32)dstBuff.size, ZSTD_getErrorName(job->cSize));
+#else
+    if (sizeof(size_t) > sizeof(int))
+        assert(job->srcSize < ((size_t)INT_MAX) * ZSTD_BLOCKSIZE_MAX);   /* check overflow */
+    {   int const nbBlocks = (int)((job->srcSize + (ZSTD_BLOCKSIZE_MAX-1)) / ZSTD_BLOCKSIZE_MAX);
+        const BYTE* ip = (const BYTE*) src;
+        BYTE* const ostart = (BYTE*)dstBuff.start;
+        BYTE* op = ostart;
+        BYTE* oend = op + dstBuff.size;
+        int blockNb;
+        job->cSize = 0;
+        for (blockNb = 0; blockNb < nbBlocks-1; blockNb++) {
+            size_t const cSize = ZSTD_compressContinue(cctx, op, oend-op, ip, ZSTD_BLOCKSIZE_MAX);
+            if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
+            ip += ZSTD_BLOCKSIZE_MAX;
+            op += cSize; assert(op < oend);
+            /* stats */
+            job->cSize += cSize;
+            job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1);
+        }
+        /* last block */
+        {   size_t const lastBlockSize1 = job->srcSize & (ZSTD_BLOCKSIZE_MAX-1);
+            size_t const lastBlockSize = (lastBlockSize1==0) ? ZSTD_BLOCKSIZE_MAX : lastBlockSize1;
+            size_t const cSize = (job->lastChunk) ?
+                 ZSTD_compressEnd     (cctx, op, oend-op, ip, lastBlockSize) :
+                 ZSTD_compressContinue(cctx, op, oend-op, ip, lastBlockSize);
+            if (ZSTD_isError(cSize)) { job->cSize = cSize; goto _endJob; }
+            /* stats */
+            job->cSize += cSize;
+            job->readSize = ZSTD_BLOCKSIZE_MAX * (blockNb+1);
+        }
+    }
+#endif
 
 _endJob:
+    /* release */
     ZSTDMT_releaseCCtx(job->cctxPool, cctx);
     ZSTDMT_releaseBuffer(job->bufPool, job->src);
     job->src = g_nullBuffer; job->srcStart = NULL;
+    /* report */
     ZSTD_PTHREAD_MUTEX_LOCK(job->jobCompleted_mutex);
     job->jobCompleted = 1;
     job->jobScanned = 0;
@@ -440,6 +455,21 @@ struct ZSTDMT_CCtx_s {
     const ZSTD_CDict* cdict;
 };
 
+/* Sets parameters relevant to the compression job, initializing others to
+ * default values. Notably, nbThreads should probably be zero. */
+static ZSTD_CCtx_params ZSTDMT_makeJobCCtxParams(ZSTD_CCtx_params const params)
+{
+    ZSTD_CCtx_params jobParams;
+    memset(&jobParams, 0, sizeof(jobParams));
+
+    jobParams.cParams = params.cParams;
+    jobParams.fParams = params.fParams;
+    jobParams.compressionLevel = params.compressionLevel;
+
+    jobParams.ldmParams = params.ldmParams;
+    return jobParams;
+}
+
 static ZSTDMT_jobDescription* ZSTDMT_allocJobsTable(U32* nbJobsPtr, ZSTD_customMem cMem)
 {
     U32 const nbJobsLog2 = ZSTD_highbit32(*nbJobsPtr) + 1;
@@ -908,6 +938,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
     zcs->jobs[jobID].src = zcs->inBuff.buffer;
     zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
     zcs->jobs[jobID].srcSize = srcSize;
+    zcs->jobs[jobID].readSize = 0;
     zcs->jobs[jobID].prefixSize = zcs->dictSize;
     assert(zcs->inBuff.filled >= srcSize + zcs->dictSize);
     zcs->jobs[jobID].params = zcs->params;
index 887cbeb378cc93d5fe08a005945961dd2a603b34..3ae2d40575fb6fe778dd127df7aab546eab91903 100644 (file)
@@ -828,7 +828,7 @@ finish:
     /* Status */
     DISPLAYLEVEL(2, "\r%79s\r", "");
     DISPLAYLEVEL(2,"%-20s :%6.2f%%   (%6llu => %6llu bytes, %s) \n", srcFileName,
-        (double)compressedfilesize/(readsize+(!readsize) /* avoid div by zero */ )*100,
+        (double)compressedfilesize / (readsize+(!readsize)/*avoid div by zero*/) * 100,
         (unsigned long long)readsize, (unsigned long long) compressedfilesize,
          dstFileName);