]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
overlapped section, for improved compression
authorYann Collet <cyan@fb.com>
Wed, 25 Jan 2017 06:32:12 +0000 (22:32 -0800)
committerYann Collet <cyan@fb.com>
Wed, 25 Jan 2017 06:32:12 +0000 (22:32 -0800)
Sections 2+ read a bit of data from previous section
in order to improve compression ratio.
This also costs some CPU, to reference read data.

Read data is currently fixed to window>>3 size

lib/compress/zstdmt_compress.c
lib/compress/zstdmt_compress.h

index e5790807812d7d56c5fffdf4b28559c0355df7a8..99b2e68fbfc19e8e75fd8733a062b9f1ca2e33f4 100644 (file)
@@ -209,6 +209,7 @@ typedef struct {
     buffer_t src;
     const void* srcStart;
     size_t   srcSize;
+    size_t   dictSize;
     buffer_t dstBuff;
     size_t   cSize;
     size_t   dstFlushed;
@@ -220,8 +221,6 @@ typedef struct {
     pthread_cond_t* jobCompleted_cond;
     ZSTD_parameters params;
     ZSTD_CDict* cdict;
-    const void* dict;
-    size_t dictSize;
     unsigned long long fullFrameSize;
 } ZSTDMT_jobDescription;
 
@@ -229,16 +228,18 @@ typedef struct {
 void ZSTDMT_compressChunk(void* jobDescription)
 {
     ZSTDMT_jobDescription* const job = (ZSTDMT_jobDescription*)jobDescription;
+    const void* const src = (const char*)job->srcStart + job->dictSize;
     buffer_t const dstBuff = job->dstBuff;
+    DEBUGLOG(3, "job (first:%u) (last:%u) : dictSize %u, srcSize %u", job->firstChunk, job->lastChunk, (U32)job->dictSize, (U32)job->srcSize);
     if (job->cdict) {
         size_t const initError = ZSTD_compressBegin_usingCDict(job->cctx, job->cdict, job->fullFrameSize);
         if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
     } else {
-        size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->dict, job->dictSize, job->params, job->fullFrameSize);
+        size_t const initError = ZSTD_compressBegin_advanced(job->cctx, job->srcStart, job->dictSize, job->params, job->fullFrameSize);
         if (ZSTD_isError(initError)) { job->cSize = initError; goto _endJob; }
     }
     if (!job->firstChunk) {  /* flush frame header */
-        size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, 0);
+        size_t const hSize = ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, 0);
         if (ZSTD_isError(hSize)) { job->cSize = hSize; goto _endJob; }
         ZSTD_invalidateRepCodes(job->cctx);
     }
@@ -246,8 +247,8 @@ void ZSTDMT_compressChunk(void* jobDescription)
     DEBUGLOG(4, "Compressing : ");
     DEBUG_PRINTHEX(4, job->srcStart, 12);
     job->cSize = (job->lastChunk) ?   /* last chunk signal */
-                 ZSTD_compressEnd(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize) :
-                 ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, job->srcStart, job->srcSize);
+                 ZSTD_compressEnd     (job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize) :
+                 ZSTD_compressContinue(job->cctx, dstBuff.start, dstBuff.size, src, job->srcSize);
     DEBUGLOG(3, "compressed %u bytes into %u bytes   (first:%u) (last:%u)", (unsigned)job->srcSize, (unsigned)job->cSize, job->firstChunk, job->lastChunk);
 
 _endJob:
@@ -271,6 +272,8 @@ struct ZSTDMT_CCtx_s {
     pthread_cond_t jobCompleted_cond;
     size_t targetSectionSize;
     size_t inBuffSize;
+    size_t dictSize;
+    size_t targetDictSize;
     inBuff_t inBuff;
     ZSTD_parameters params;
     XXH64_state_t xxhState;
@@ -354,7 +357,7 @@ size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx* mtctx)
     return 0;
 }
 
-unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
+size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value)
 {
     switch(parameter)
     {
@@ -503,10 +506,14 @@ static size_t ZSTDMT_initCStream_internal(ZSTDMT_CCtx* zcs,
     zcs->frameContentSize = pledgedSrcSize;
     zcs->targetSectionSize = zcs->sectionSize ? zcs->sectionSize : (size_t)1 << (zcs->params.cParams.windowLog + 2);
     zcs->targetSectionSize = MAX(ZSTDMT_SECTION_SIZE_MIN, zcs->targetSectionSize);
-    zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog);
+    //zcs->targetDictSize = ((size_t)1 << zcs->params.cParams.windowLog);   /* full window size, for test */
+    zcs->targetDictSize = ((size_t)1 << zcs->params.cParams.windowLog) >> 3;   /* fixed currently */
+    //zcs->targetDictSize = 0;
+    zcs->inBuffSize = zcs->targetSectionSize + ((size_t)1 << zcs->params.cParams.windowLog) /* margin */ + zcs->targetDictSize;
     zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
     if (zcs->inBuff.buffer.start == NULL) return ERROR(memory_allocation);
     zcs->inBuff.filled = 0;
+    zcs->dictSize = 0;
     zcs->doneJobID = 0;
     zcs->nextJobID = 0;
     zcs->frameEnded = 0;
@@ -551,15 +558,14 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
         return ERROR(memory_allocation);
     }
 
-    DEBUGLOG(4, "preparing job %u to compress %u bytes \n", zcs->nextJobID, (U32)srcSize);
+    DEBUGLOG(4, "preparing job %u to compress %u bytes with %u preload ", zcs->nextJobID, (U32)srcSize, (U32)zcs->dictSize);
     zcs->jobs[jobID].src = zcs->inBuff.buffer;
     zcs->jobs[jobID].srcStart = zcs->inBuff.buffer.start;
     zcs->jobs[jobID].srcSize = srcSize;
+    zcs->jobs[jobID].dictSize = zcs->dictSize;   /* note : zcs->inBuff.filled is presumed >= srcSize + dictSize */
     zcs->jobs[jobID].params = zcs->params;
     if (zcs->nextJobID) zcs->jobs[jobID].params.fParams.checksumFlag = 0;  /* do not calculate checksum within sections, just keep it in header for first section */
     zcs->jobs[jobID].cdict = zcs->nextJobID==0 ? zcs->cdict : NULL;
-    zcs->jobs[jobID].dict = NULL;
-    zcs->jobs[jobID].dictSize = 0;
     zcs->jobs[jobID].fullFrameSize = zcs->frameContentSize;
     zcs->jobs[jobID].dstBuff = dstBuffer;
     zcs->jobs[jobID].cctx = cctx;
@@ -572,6 +578,7 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
 
     /* get a new buffer for next input */
     if (!endFrame) {
+        size_t const newDictSize = MIN(srcSize + zcs->dictSize, zcs->targetDictSize);
         zcs->inBuff.buffer = ZSTDMT_getBuffer(zcs->buffPool, zcs->inBuffSize);
         if (zcs->inBuff.buffer.start == NULL) {   /* not enough memory to allocate next input buffer */
             zcs->jobs[jobID].jobCompleted = 1;
@@ -580,8 +587,12 @@ static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx* zcs, size_t srcSize, unsi
             ZSTDMT_releaseAllJobResources(zcs);
             return ERROR(memory_allocation);
         }
-        zcs->inBuff.filled -= srcSize;
-        memcpy(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + srcSize, zcs->inBuff.filled);
+        DEBUGLOG(5, "inBuff filled to %u", (U32)zcs->inBuff.filled);
+        zcs->inBuff.filled -= srcSize + zcs->dictSize - newDictSize;
+        DEBUGLOG(5, "new job : filled to %u, with %u dict and %u src", (U32)zcs->inBuff.filled, (U32)newDictSize, (U32)(zcs->inBuff.filled - newDictSize));
+        memmove(zcs->inBuff.buffer.start, (const char*)zcs->jobs[jobID].srcStart + zcs->dictSize + srcSize - newDictSize, zcs->inBuff.filled);
+        DEBUGLOG(5, "new inBuff pre-filled");
+        zcs->dictSize = newDictSize;
     } else {
         zcs->inBuff.buffer = g_nullBuffer;
         zcs->inBuff.filled = 0;
@@ -625,7 +636,7 @@ static size_t ZSTDMT_flushNextJob(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* output, unsi
             zcs->jobs[wJobID].cctx = NULL;
             DEBUGLOG(5, "zcs->params.fParams.checksumFlag : %u ", zcs->params.fParams.checksumFlag);
             if (zcs->params.fParams.checksumFlag) {
-                XXH64_update(&zcs->xxhState, job.srcStart, job.srcSize);
+                XXH64_update(&zcs->xxhState, (const char*)job.srcStart + job.dictSize, job.srcSize);
                 if (zcs->frameEnded && (zcs->doneJobID+1 == zcs->nextJobID)) {  /* write checksum at end of last section */
                     U32 const checksum = (U32)XXH64_digest(&zcs->xxhState);
                     DEBUGLOG(4, "writing checksum : %08X \n", checksum);
@@ -689,10 +700,10 @@ static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx* zcs, ZSTD_outBuffer* outp
 {
     size_t const srcSize = zcs->inBuff.filled;
 
-    DEBUGLOG(4, "flushing : %u bytes left to compress", (U32)srcSize);
+    if (srcSize) DEBUGLOG(1, "flushing : %u bytes left to compress", (U32)srcSize);
     if ( ((srcSize > 0) || (endFrame && !zcs->frameEnded))
        && (zcs->nextJobID <= zcs->doneJobID + zcs->jobIDMask) ) {
-        CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize, endFrame) );
+        CHECK_F( ZSTDMT_createCompressionJob(zcs, srcSize - zcs->dictSize, endFrame) );
     }
 
     /* check if there is any data available to flush */
index c00782e9d5a2130a65ce4f6f9fa65b17c91899b8..1288c1ed0cc5cb904aa26f14400769cbeec5091b 100644 (file)
@@ -59,4 +59,4 @@ typedef enum { ZSTDMT_p_sectionSize    /* size of input "section". Each section
  * The function must be called typically after ZSTD_createCCtx().
  * Parameters not explicitly reset by ZSTDMT_init*() remain the same in consecutive compression sessions.
  * @return : 0, or an error code (which can be tested using ZSTD_isError()) */
-ZSTDLIB_API unsigned ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value);
+ZSTDLIB_API size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx* mtctx, ZSDTMT_parameter parameter, unsigned value);