]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
Added ZSTDMT_initCStream_advanced() variant
authorYann Collet <cyan@fb.com>
Thu, 19 Jan 2017 23:32:07 +0000 (15:32 -0800)
committerYann Collet <cyan@fb.com>
Thu, 19 Jan 2017 23:32:07 +0000 (15:32 -0800)
Correctly compress with custom params and dictionary
Added relevant fuzzer test in zstreamtest

Also :
new macro ZSTDMT_SECTION_LOGSIZE_MIN, which sets a minimum size for a full job
(note : a flush() command can still generate a partial job anytime)

lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h
tests/Makefile
tests/zstreamtest.c

index 775c52aae84c8fe4906141340c712664cd3b0117..dd1fd3452b49412ac3b57bb849a8d4884b38150e 100644 (file)
@@ -1,3 +1,12 @@
+
+
+/* ======   Tuning parameters   ====== */
+#ifndef ZSTDMT_SECTION_LOGSIZE_MIN
+#define ZSTDMT_SECTION_LOGSIZE_MIN 20   /*< minimum size for a full compression job (20==2^20==1 MB) */
+#endif
+
+/* ======   Dependencies   ====== */
+
 #include <stdlib.h>   /* malloc */
 #include <string.h>   /* memcpy */
 #include <pool.h>     /* threadpool */
@@ -180,13 +189,15 @@ typedef struct {
     buffer_t dstBuff;
     size_t   cSize;
     size_t   dstFlushed;
-    unsigned long long fullFrameSize;
     unsigned firstChunk;
     unsigned lastChunk;
     unsigned jobCompleted;
     pthread_mutex_t* jobCompleted_mutex;
     pthread_cond_t* jobCompleted_cond;
     ZSTD_parameters params;
+    const void* dict;
+    size_t dictSize;
+    unsigned long long fullFrameSize;
 } ZSTDMT_jobDescription;
 
 /* ZSTDMT_compressChunk() : POOL_function type */
@@ -194,7 +205,7 @@ void ZSTDMT_compressChunk(void* jobDescription)
 {
     ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
     buffer_t const dstBuff = job->dstBuff;
-    size_t const initError = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, job->params, job->fullFrameSize);
+    size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->dict, job->dictSize, job->params, job->fullFrameSize);
     if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
     if (!job->firstChunk) {  /* flush frame header */
         size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0);
@@ -237,6 +248,9 @@ struct ZSTDMT_CCtx_s {
     unsigned nextJobID;
     unsigned frameEnded;
     unsigned allJobsCompleted;
+    unsigned long long frameContentSize;
+    const void* dict;
+    size_t dictSize;
     ZSTDMT_jobDescription jobs[1];   /* variable size (must lies at the end) */
 };
 
@@ -405,15 +419,20 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) {
     }
 }
 
-size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
+size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,
+                                   ZSTD_parameters params, unsigned long long pledgedSrcSize) {
     if (zcs->allJobsCompleted == 0) {   /* previous job not correctly finished */
         ZSTDMT_waitForAllJobsCompleted(zcs);
         ZSTDMT_releaseAllJobResources(zcs);
         zcs->allJobsCompleted = 1;
     }
-    zcs->params = ZSTD_getParams(compressionLevel, 0, 0);
-    zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2);
-    zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog);
+    params.fParams.checksumFlag = 0;   /* current limitation : no checksum (to be lifted in a later version) */
+    zcs->params = params;
+    zcs->dict = dict;
+    zcs->dictSize = dictSize;
+    zcs->frameContentSize = pledgedSrcSize;
+    zcs->targetSectionSize = (size_t)1 << MAX(ZSTDMT_SECTION_LOGSIZE_MIN, (zcs->params.cParams.windowLog + 2));
+    zcs->inBuffSize = zcs->targetSectionSize + (1 << zcs->params.cParams.windowLog);
     zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
     if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
     zcs->inBuff.filled = 0;
@@ -424,6 +443,11 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
     return 0;
 }
 
+size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) {
+    ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
+    return ZSTDMT_initCStream_advanced(zcs, NULL, 0, params, 0);
+}
+
 
 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
 {
@@ -455,8 +479,10 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu
         zcs->jobs[jobID].src = zcs->inBuff.buffer;
         zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
         zcs->jobs[jobID].srcSize = zcs->targetSectionSize;
-        zcs->jobs[jobID].fullFrameSize = 0;
         zcs->jobs[jobID].params = zcs->params;
+        zcs->jobs[jobID].dict = zcs->nextJobID == 0 ? zcs->dict : NULL;
+        zcs->jobs[jobID].dictSize = zcs->nextJobID == 0 ? zcs->dictSize : 0;
+        zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
         zcs->jobs[jobID].dstBuff = dstBuffer;
         zcs->jobs[jobID].cctx = cctx;
         zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
@@ -539,8 +565,10 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
         zcs->jobs[jobID].src = zcs->inBuff.buffer;
         zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
         zcs->jobs[jobID].srcSize = srcSize;
-        zcs->jobs[jobID].fullFrameSize = 0;
         zcs->jobs[jobID].params = zcs->params;
+        zcs->jobs[jobID].dict = zcs->nextJobID == 0 ? zcs->dict : NULL;
+        zcs->jobs[jobID].dictSize = zcs->nextJobID == 0 ? zcs->dictSize : 0;
+        zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
         zcs->jobs[jobID].dstBuff = dstBuffer;
         zcs->jobs[jobID].cctx = cctx;
         zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0);
index ca5d6b60135eef7fc719d01978bdf8d919f67f52..d1b01a79e07039f5130abba66d4373ba19b81b6d 100644 (file)
@@ -1,6 +1,7 @@
 
 /* ===   Dependencies   === */
 #include <stddef.h>   /* size_t */
+#define ZSTD_STATIC_LINKING_ONLY   /* ZSTD_parameters */
 #include "zstd.h"     /* ZSTD_inBuffer, ZSTD_outBuffer */
 
 
@@ -19,6 +20,10 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx,
 /* ===   Streaming functions   === */
 
 size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel);
+size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize,
+                                   ZSTD_parameters params, unsigned long long pledgedSrcSize);  /**< pledgedSrcSize is optional and can be zero == unknown ; current limitation : no checksum */
+
 size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input);
-size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
-size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);
+
+size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);   /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
+size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output);     /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */
index 6bc1ad2e2cbe8e0341734571f13f882b3e446536..bbc8d3de1009e5dbe8d395e5831f93b3a4fc1dcc 100644 (file)
@@ -26,8 +26,8 @@ TESTARTEFACT := versionsTest namespaceTest
 
 CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/compress -I$(ZSTDDIR)/dictBuilder -I$(ZSTDDIR)/deprecated -I$(PRGDIR)
 CFLAGS  ?= -O3
-CFLAGS  += -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \
-          -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef
+CFLAGS  += -g -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \
+           -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef
 CFLAGS  += $(MOREFLAGS)
 FLAGS    = $(CPPFLAGS) $(CFLAGS) $(LDFLAGS)
 
index 6cf6c4a09a62e3a3baa483e5498c4e9fede09a5c..1feec450b4db6df06734a5d09c1dfb2faf144a6b 100644 (file)
@@ -759,7 +759,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
             if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1;
             {   int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1;
                 size_t const resetError = ZSTDMT_initCStream(zc, compressionLevel);
-                CHECK(ZSTD_isError(resetError), "ZSTD_resetCStream error : %s", ZSTD_getErrorName(resetError));
+                CHECK(ZSTD_isError(resetError), "ZSTDMT_initCStream error : %s", ZSTD_getErrorName(resetError));
             }
         } else {
             U32 const testLog = FUZ_rand(&lseed) % maxSrcLog;
@@ -767,11 +767,18 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
             maxTestSize = FUZ_rLogLength(&lseed, testLog);
             oldTestLog = testLog;
             /* random dictionary selection */
-            dictSize  = 0;
-            dict = NULL;
-            {   size_t const initError = ZSTDMT_initCStream(zc, cLevel);
-                CHECK (ZSTD_isError(initError),"ZSTD_initCStream_advanced error : %s", ZSTD_getErrorName(initError));
-        }   }
+            dictSize  = ((FUZ_rand(&lseed)&63)==1) ? FUZ_randomLength(&lseed, maxSampleLog) : 0;
+            {   size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize);
+                dict = srcBuffer + dictStart;
+            }
+            {   U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? 0 : maxTestSize;
+                ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize);
+                DISPLAYLEVEL(5, "Init with windowLog = %u \n", params.cParams.windowLog);
+                params.fParams.checksumFlag = FUZ_rand(&lseed) & 1;
+                params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1;
+                {   size_t const initError = ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize);
+                    CHECK (ZSTD_isError(initError),"ZSTDMT_initCStream_advanced error : %s", ZSTD_getErrorName(initError));
+        }   }   }
 
         /* multi-segments compression test */
         XXH64_reset(&xxhState, 0);
@@ -790,6 +797,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp
                     DISPLAYLEVEL(5, "Sending %u bytes to compress \n", (U32)srcSize);
                     { size_t const compressionError = ZSTDMT_compressStream(zc, &outBuff, &inBuff);
                       CHECK (ZSTD_isError(compressionError), "compression error : %s", ZSTD_getErrorName(compressionError)); }
+                    DISPLAYLEVEL(5, "%u bytes read by ZSTDMT_compressStream \n", (U32)inBuff.pos);
 
                     XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos);
                     memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos);
@@ -924,8 +932,9 @@ int main(int argc, const char** argv)
     int testNb = 0;
     int proba = FUZ_COMPRESSIBILITY_DEFAULT;
     int result=0;
-    U32 mainPause = 0;
-    const char* programName = argv[0];
+    int mainPause = 0;
+    int mtOnly = 0;
+    const char* const programName = argv[0];
     ZSTD_customMem customMem = { allocFunction, freeFunction, NULL };
     ZSTD_customMem customNULL = { NULL, NULL, NULL };
 
@@ -936,8 +945,10 @@ int main(int argc, const char** argv)
 
         /* Parsing commands. Aggregated commands are allowed */
         if (argument[0]=='-') {
-            argument++;
 
+            if (!strcmp(argument, "--mt")) { mtOnly=1; continue; }
+
+            argument++;
             while (*argument!=0) {
                 switch(*argument)
                 {
@@ -1041,7 +1052,7 @@ int main(int argc, const char** argv)
             result = basicUnitTests(0, ((double)proba) / 100, customMem);  /* use custom memory allocation functions */
     }   }
 
-    if (!result) result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100);
+    if (!result && !mtOnly) result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100);
     if (!result) result = fuzzerTests_MT(seed, nbTests, testNb, ((double)proba) / 100);
 
     if (mainPause) {