]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
split compression into smaller blocks
authorPaul Cruz <paulcruz74@fb.com>
Wed, 19 Jul 2017 18:23:40 +0000 (11:23 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Wed, 19 Jul 2017 18:23:40 +0000 (11:23 -0700)
contrib/adaptive-compression/adapt.c

index cf908ca94c5cc0d4a39a8f05cf00041290a216bc..17721f87e8f486ffee720733f0ebc94426b732e6 100644 (file)
@@ -434,44 +434,66 @@ static void* compressionThread(void* arg)
 
         /* adapt compression level */
         adaptCompressionLevel(ctx);
-        
+
         /* compress the data */
         {
+            size_t const compressionBlockSize = 4 << 17; /* 128 KB */
             unsigned const cLevel = ctx->compressionLevel;
+            unsigned blockNum = 0;
+            size_t remaining = job->src.size;
+            size_t srcPos = 0;
+            size_t dstPos = 0;
+            size_t dictPos = 0;
             DEBUG(3, "cLevel used: %u\n", cLevel);
             DEBUG(3, "compression level used: %u\n", cLevel);
-            /* begin compression */
-            {
-                size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
-                DEBUG(3, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize);
-                size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1);
-                size_t const initError = ZSTD_compressBegin_usingDict(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, cLevel);
-                size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1);
-                if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
-                    DISPLAY("Error: something went wrong while starting compression\n");
-                    signalErrorToThreads(ctx);
-                    return arg;
+
+            /* reset compressed size */
+            job->compressedSize = 0;
+
+            while (remaining != 0) {
+                size_t const actualBlockSize = MIN(remaining, compressionBlockSize);
+                /* begin compression */
+                {
+                    size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
+                    DEBUG(3, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize);
+                    size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1);
+                    size_t const initError = ZSTD_compressBegin_usingDict(ctx->cctx, job->src.start + job->dictSize - useDictSize + dictPos, useDictSize, cLevel);
+                    size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1);
+                    if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
+                        DISPLAY("Error: something went wrong while starting compression\n");
+                        signalErrorToThreads(ctx);
+                        return arg;
+                    }
                 }
-            }
 
-            /* continue compression */
-            if (currJob != 0) { /* not first job flush/overwrite the frame header */
-                size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.capacity, job->src.start + job->dictSize, 0);
-                if (ZSTD_isError(hSize)) {
-                    DISPLAY("Error: something went wrong while continuing compression\n");
-                    job->compressedSize = hSize;
-                    signalErrorToThreads(ctx);
-                    return arg;
+                /* continue compression */
+                if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */
+                    size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, 0);
+                    if (ZSTD_isError(hSize)) {
+                        DISPLAY("Error: something went wrong while continuing compression\n");
+                        job->compressedSize = hSize;
+                        signalErrorToThreads(ctx);
+                        return arg;
+                    }
+                    ZSTD_invalidateRepCodes(ctx->cctx);
+                }
+                {
+
+                    size_t const ret = (job->lastJob && remaining <= compressionBlockSize) ?
+                                            ZSTD_compressEnd     (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
+                                            ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
+                    if (ZSTD_isError(ret)) {
+                        DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(ret));
+                        signalErrorToThreads(ctx);
+                        return arg;
+                    }
+                    job->compressedSize += ret;
+                    remaining -= actualBlockSize;
+                    srcPos += actualBlockSize;
+                    dstPos += ret;
+                    dictPos += actualBlockSize;
+                    blockNum++;
                 }
-                ZSTD_invalidateRepCodes(ctx->cctx);
-            }
-            job->compressedSize = (job->lastJob) ?
-                                    ZSTD_compressEnd     (ctx->cctx, job->dst.start, job->dst.capacity, job->src.start + job->dictSize, job->src.size) :
-                                    ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.capacity, job->src.start + job->dictSize, job->src.size);
-            if (ZSTD_isError(job->compressedSize)) {
-                DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(job->compressedSize));
-                signalErrorToThreads(ctx);
-                return arg;
             }
             job->dst.size = job->compressedSize;
         }