From: Yann Collet Date: Thu, 19 Jan 2017 23:32:07 +0000 (-0800) Subject: Added ZSTDMT_initCStream_advanced() variant X-Git-Tag: v1.1.3^2~19^2~29 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=19d670ba9d59b987c51dab5d900d45c62b24490a;p=thirdparty%2Fzstd.git Added ZSTDMT_initCStream_advanced() variant Correctly compress with custom params and dictionary Added relevant fuzzer test in zstreamtest Also : new macro ZSTDMT_SECTION_LOGSIZE_MIN, which sets a minimum size for a full job (note : a flush() command can still generate a partial job anytime) --- diff --git a/lib/compress/zstdmt_compress.c b/lib/compress/zstdmt_compress.c index 775c52aae..dd1fd3452 100644 --- a/lib/compress/zstdmt_compress.c +++ b/lib/compress/zstdmt_compress.c @@ -1,3 +1,12 @@ + + +/* ====== Tuning parameters ====== */ +#ifndef ZSTDMT_SECTION_LOGSIZE_MIN +#define ZSTDMT_SECTION_LOGSIZE_MIN 20 /*< minimum size for a full compression job (20==2^20==1 MB) */ +#endif + +/* ====== Dependencies ====== */ + #include /* malloc */ #include /* memcpy */ #include /* threadpool */ @@ -180,13 +189,15 @@ typedef struct { buffer_t dstBuff; size_t cSize; size_t dstFlushed; - unsigned long long fullFrameSize; unsigned firstChunk; unsigned lastChunk; unsigned jobCompleted; pthread_mutex_t* jobCompleted_mutex; pthread_cond_t* jobCompleted_cond; ZSTD_parameters params; + const void* dict; + size_t dictSize; + unsigned long long fullFrameSize; } ZSTDMT_jobDescription; /* ZSTDMT_compressChunk() : POOL_function type */ @@ -194,7 +205,7 @@ void ZSTDMT_compressChunk(void* jobDescription) { ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription; buffer_t const dstBuff = job->dstBuff; - size_t const initError = ZSTD_compressBegin_advanced(job->cctx, NULL, 0, job->params, job->fullFrameSize); + size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->dict, job->dictSize, job->params, job->fullFrameSize); if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; } if (!job->firstChunk) { /* flush frame header */ size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0); @@ -237,6 +248,9 @@ struct ZSTDMT_CCtx_s { unsigned nextJobID; unsigned frameEnded; unsigned allJobsCompleted; + unsigned long long frameContentSize; + const void* dict; + size_t dictSize; ZSTDMT_jobDescription jobs[1]; /* variable size (must lies at the end) */ }; @@ -405,15 +419,20 @@ static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx* zcs) { } } -size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { +size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, + ZSTD_parameters params, unsigned long long pledgedSrcSize) { if (zcs->allJobsCompleted == 0) { /* previous job not correctly finished */ ZSTDMT_waitForAllJobsCompleted(zcs); ZSTDMT_releaseAllJobResources(zcs); zcs->allJobsCompleted = 1; } - zcs->params = ZSTD_getParams(compressionLevel, 0, 0); - zcs->targetSectionSize = (size_t)1 << (zcs->params.cParams.windowLog + 2); - zcs->inBuffSize = 5 * (1 << zcs->params.cParams.windowLog); + params.fParams.checksumFlag = 0; /* current limitation : no checksum (to be lifted in a later version) */ + zcs->params = params; + zcs->dict = dict; + zcs->dictSize = dictSize; + zcs->frameContentSize = pledgedSrcSize; + zcs->targetSectionSize = (size_t)1 << MAX(ZSTDMT_SECTION_LOGSIZE_MIN, (zcs->params.cParams.windowLog + 2)); + zcs->inBuffSize = zcs->targetSectionSize + (1 << zcs->params.cParams.windowLog); zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize); if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation); zcs->inBuff.filled = 0; @@ -424,6 +443,11 @@ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { return 0; } +size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel) { + ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0); + return ZSTDMT_initCStream_advanced(zcs, NULL, 0, params, 0); +} + size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input) { @@ -455,8 +479,10 @@ size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBu zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = zcs->targetSectionSize; - zcs->jobs[jobID].fullFrameSize = 0; zcs->jobs[jobID].params = zcs->params; + zcs->jobs[jobID].dict = zcs->nextJobID == 0 ? zcs->dict : NULL; + zcs->jobs[jobID].dictSize = zcs->nextJobID == 0 ? zcs->dictSize : 0; + zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; zcs->jobs[jobID].dstBuff = dstBuffer; zcs->jobs[jobID].cctx = cctx; zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); @@ -539,8 +565,10 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp zcs->jobs[jobID].src = zcs->inBuff.buffer; zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start; zcs->jobs[jobID].srcSize = srcSize; - zcs->jobs[jobID].fullFrameSize = 0; zcs->jobs[jobID].params = zcs->params; + zcs->jobs[jobID].dict = zcs->nextJobID == 0 ? zcs->dict : NULL; + zcs->jobs[jobID].dictSize = zcs->nextJobID == 0 ? zcs->dictSize : 0; + zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize; zcs->jobs[jobID].dstBuff = dstBuffer; zcs->jobs[jobID].cctx = cctx; zcs->jobs[jobID].firstChunk = (zcs->nextJobID==0); diff --git a/lib/compress/zstdmt_compress.h b/lib/compress/zstdmt_compress.h index ca5d6b601..d1b01a79e 100644 --- a/lib/compress/zstdmt_compress.h +++ b/lib/compress/zstdmt_compress.h @@ -1,6 +1,7 @@ /* === Dependencies === */ #include /* size_t */ +#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_parameters */ #include "zstd.h" /* ZSTD_inBuffer, ZSTD_outBuffer */ @@ -19,6 +20,10 @@ size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx* cctx, /* === Streaming functions === */ size_t ZSTDMT_initCStream(ZSTDMT_CCtx* zcs, int compressionLevel); +size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx* zcs, const void* dict, size_t dictSize, + ZSTD_parameters params, unsigned long long pledgedSrcSize); /**< pledgedSrcSize is optional and can be zero == unknown ; current limitation : no checksum */ + size_t ZSTDMT_compressStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, ZSTD_inBuffer* input); -size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); -size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); + +size_t ZSTDMT_flushStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ +size_t ZSTDMT_endStream(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output); /**< @return : 0 == all flushed; >0 : still some data to be flushed; or an error code (ZSTD_isError()) */ diff --git a/tests/Makefile b/tests/Makefile index 6bc1ad2e2..bbc8d3de1 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -26,8 +26,8 @@ TESTARTEFACT := versionsTest namespaceTest CPPFLAGS+= -I$(ZSTDDIR) -I$(ZSTDDIR)/common -I$(ZSTDDIR)/compress -I$(ZSTDDIR)/dictBuilder -I$(ZSTDDIR)/deprecated -I$(PRGDIR) CFLAGS ?= -O3 -CFLAGS += -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \ - -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef +CFLAGS += -g -Wall -Wextra -Wcast-qual -Wcast-align -Wshadow -Wstrict-aliasing=1 \ + -Wswitch-enum -Wdeclaration-after-statement -Wstrict-prototypes -Wundef CFLAGS += $(MOREFLAGS) FLAGS = $(CPPFLAGS) $(CFLAGS) $(LDFLAGS) diff --git a/tests/zstreamtest.c b/tests/zstreamtest.c index 6cf6c4a09..1feec450b 100644 --- a/tests/zstreamtest.c +++ b/tests/zstreamtest.c @@ -759,7 +759,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp if (maxTestSize >= srcBufferSize) maxTestSize = srcBufferSize-1; { int const compressionLevel = (FUZ_rand(&lseed) % 5) + 1; size_t const resetError = ZSTDMT_initCStream(zc, compressionLevel); - CHECK(ZSTD_isError(resetError), "ZSTD_resetCStream error : %s", ZSTD_getErrorName(resetError)); + CHECK(ZSTD_isError(resetError), "ZSTDMT_initCStream error : %s", ZSTD_getErrorName(resetError)); } } else { U32 const testLog = FUZ_rand(&lseed) % maxSrcLog; @@ -767,11 +767,18 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp maxTestSize = FUZ_rLogLength(&lseed, testLog); oldTestLog = testLog; /* random dictionary selection */ - dictSize = 0; - dict = NULL; - { size_t const initError = ZSTDMT_initCStream(zc, cLevel); - CHECK (ZSTD_isError(initError),"ZSTD_initCStream_advanced error : %s", ZSTD_getErrorName(initError)); - } } + dictSize = ((FUZ_rand(&lseed)&63)==1) ? FUZ_randomLength(&lseed, maxSampleLog) : 0; + { size_t const dictStart = FUZ_rand(&lseed) % (srcBufferSize - dictSize); + dict = srcBuffer + dictStart; + } + { U64 const pledgedSrcSize = (FUZ_rand(&lseed) & 3) ? 0 : maxTestSize; + ZSTD_parameters params = ZSTD_getParams(cLevel, pledgedSrcSize, dictSize); + DISPLAYLEVEL(5, "Init with windowLog = %u \n", params.cParams.windowLog); + params.fParams.checksumFlag = FUZ_rand(&lseed) & 1; + params.fParams.noDictIDFlag = FUZ_rand(&lseed) & 1; + { size_t const initError = ZSTDMT_initCStream_advanced(zc, dict, dictSize, params, pledgedSrcSize); + CHECK (ZSTD_isError(initError),"ZSTDMT_initCStream_advanced error : %s", ZSTD_getErrorName(initError)); + } } } /* multi-segments compression test */ XXH64_reset(&xxhState, 0); @@ -790,6 +797,7 @@ static int fuzzerTests_MT(U32 seed, U32 nbTests, unsigned startTest, double comp DISPLAYLEVEL(5, "Sending %u bytes to compress \n", (U32)srcSize); { size_t const compressionError = ZSTDMT_compressStream(zc, &outBuff, &inBuff); CHECK (ZSTD_isError(compressionError), "compression error : %s", ZSTD_getErrorName(compressionError)); } + DISPLAYLEVEL(5, "%u bytes read by ZSTDMT_compressStream \n", (U32)inBuff.pos); XXH64_update(&xxhState, srcBuffer+srcStart, inBuff.pos); memcpy(copyBuffer+totalTestSize, srcBuffer+srcStart, inBuff.pos); @@ -924,8 +932,9 @@ int main(int argc, const char** argv) int testNb = 0; int proba = FUZ_COMPRESSIBILITY_DEFAULT; int result=0; - U32 mainPause = 0; - const char* programName = argv[0]; + int mainPause = 0; + int mtOnly = 0; + const char* const programName = argv[0]; ZSTD_customMem customMem = { allocFunction, freeFunction, NULL }; ZSTD_customMem customNULL = { NULL, NULL, NULL }; @@ -936,8 +945,10 @@ int main(int argc, const char** argv) /* Parsing commands. Aggregated commands are allowed */ if (argument[0]=='-') { - argument++; + if (!strcmp(argument, "--mt")) { mtOnly=1; continue; } + + argument++; while (*argument!=0) { switch(*argument) { @@ -1041,7 +1052,7 @@ int main(int argc, const char** argv) result = basicUnitTests(0, ((double)proba) / 100, customMem); /* use custom memory allocation functions */ } } - if (!result) result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100); + if (!result && !mtOnly) result = fuzzerTests(seed, nbTests, testNb, ((double)proba) / 100); if (!result) result = fuzzerTests_MT(seed, nbTests, testNb, ((double)proba) / 100); if (mainPause) {