]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
made some progress on improving compression ratio, but problems exist with speed...
authorPaul Cruz <paulcruz74@fb.com>
Tue, 11 Jul 2017 01:16:42 +0000 (18:16 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Tue, 11 Jul 2017 01:16:42 +0000 (18:16 -0700)
contrib/adaptive-compression/adapt.c

index 8f3acd5c915da106b510ec6d11b256cf5d53c2ac..2bf8bb81e57b465b113eea5d3c8954da9eced21a 100644 (file)
@@ -1,6 +1,6 @@
 #define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
 #define PRINT(...) fprintf(stdout, __VA_ARGS__)
-#define DEBUGLOG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
+#define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
 #define FILE_CHUNK_SIZE 4 << 20
 #define MAX_NUM_JOBS 2;
 #define stdinmark  "/*stdin*\\"
@@ -15,7 +15,7 @@ typedef unsigned char BYTE;
 #include <stdlib.h>     /* malloc, free */
 #include <pthread.h>    /* pthread functions */
 #include <string.h>     /* memset */
-#include "zstd.h"
+#include "zstd_internal.h"
 #include "util.h"
 
 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
@@ -48,6 +48,7 @@ typedef struct {
 typedef struct {
     buffer_t src;
     buffer_t dst;
+    buffer_t dict;
     unsigned compressionLevel;
     unsigned jobID;
     unsigned lastJob;
@@ -87,6 +88,7 @@ static void freeCompressionJobs(adaptCCtx* ctx)
         jobDescription job = ctx->jobs[u];
         free(job.dst.start);
         free(job.src.start);
+        free(job.dict.start);
     }
 }
 
@@ -142,8 +144,9 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
             jobDescription* job = &ctx->jobs[jobNum];
             job->src.start = malloc(FILE_CHUNK_SIZE);
             job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
+            job->dict.start = malloc(FILE_CHUNK_SIZE);
             job->lastJob = 0;
-            if (!job->src.start || !job->dst.start) {
+            if (!job->src.start || !job->dst.start || !job->dict.start) {
                 DISPLAY("Could not allocate buffers for jobs\n");
                 freeCCtx(ctx);
                 return NULL;
@@ -207,17 +210,17 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx)
     unsigned const writeSlow = ((compressWaiting && createWaiting) || (createWaiting && !writeWaiting)) ? 1 : 0;
     unsigned const compressSlow = ((writeWaiting && createWaiting) || (writeWaiting && !compressWaiting)) ? 1 : 0;
     unsigned const createSlow = ((compressWaiting && writeWaiting) || (compressWaiting && !createWaiting)) ? 1 : 0;
-    DEBUGLOG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter);
+    DEBUG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter);
     if (allSlow) {
         reset = 1;
     }
     else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
-        DEBUGLOG(2, "increasing compression level %u\n", ctx->compressionLevel);
+        DEBUG(2, "increasing compression level %u\n", ctx->compressionLevel);
         ctx->compressionLevel++;
         reset = 1;
     }
     else if (compressSlow && ctx->compressionLevel > 1) {
-        DEBUGLOG(2, "decreasing compression level %u\n", ctx->compressionLevel);
+        DEBUG(2, "decreasing compression level %u\n", ctx->compressionLevel);
         ctx->compressionLevel--;
         reset = 1;
     }
@@ -236,38 +239,63 @@ static void* compressionThread(void* arg)
     for ( ; ; ) {
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
-        DEBUGLOG(2, "compressionThread(): waiting on job ready\n");
+        DEBUG(2, "compressionThread(): waiting on job ready\n");
         pthread_mutex_lock(&ctx->jobReady_mutex);
         while(currJob + 1 > ctx->jobReadyID) {
             ctx->stats.waitReady++;
             ctx->stats.readyCounter++;
-            DEBUGLOG(2, "waiting on job ready, nextJob: %u\n", currJob);
+            DEBUG(2, "waiting on job ready, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex);
         }
         pthread_mutex_unlock(&ctx->jobReady_mutex);
-        DEBUGLOG(2, "compressionThread(): continuing after job ready\n");
+        DEBUG(2, "compressionThread(): continuing after job ready\n");
         /* compress the data */
         {
             unsigned const cLevel = adaptCompressionLevel(ctx);
-            DEBUGLOG(2, "cLevel used: %u\n", cLevel);
-            size_t const compressedSize = ZSTD_compressCCtx(ctx->cctx, job->dst.start, job->dst.size, job->src.start, job->src.size, cLevel);
-            if (ZSTD_isError(compressedSize)) {
+            ZSTD_parameters params = ZSTD_getParams(cLevel, job->src.size, 0);
+            DEBUG(2, "cLevel used: %u\n", cLevel);
+
+            /* begin compression */
+            {
+                size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1);
+                size_t const initError = ZSTD_compressBegin_advanced(ctx->cctx, job->dict.start, job->dict.size, params, 0);
+                size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1);
+                if (ZSTD_isError(dictModeError) || ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
+                    DISPLAY("Error: something went wrong while starting compression\n");
+                    ctx->threadError = 1;
+                    return arg;
+                }
+            }
+
+            /* continue compression */
+            if (currJob != 0) { /* not first job */
+                size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.size, job->src.start, 0);
+                if (ZSTD_isError(hSize)) {
+                    job->compressedSize = hSize;
+                    ctx->threadError = 1;
+                    return arg;
+                }
+                ZSTD_invalidateRepCodes(ctx->cctx);
+            }
+            job->compressedSize = (job->lastJob) ?
+                                    ZSTD_compressEnd     (ctx->cctx, job->dst.start, job->dst.size, job->src.start, job->src.size) :
+                                    ZSTD_compressContinue(ctx->cctx, job->dst.start, job->dst.size, job->src.start, job->src.size);
+            if (ZSTD_isError(job->compressedSize)) {
+                DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(job->compressedSize));
                 ctx->threadError = 1;
-                DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(compressedSize));
                 return arg;
             }
-            job->compressedSize = compressedSize;
         }
         pthread_mutex_lock(&ctx->jobCompressed_mutex);
         ctx->jobCompressedID++;
-        DEBUGLOG(2, "signaling for job %u\n", currJob);
+        DEBUG(2, "signaling for job %u\n", currJob);
         pthread_cond_signal(&ctx->jobCompressed_cond);
         pthread_mutex_unlock(&ctx->jobCompressed_mutex);
-        DEBUGLOG(2, "finished job compression %u\n", currJob);
+        DEBUG(2, "finished job compression %u\n", currJob);
         currJob++;
         if (job->lastJob || ctx->threadError) {
             /* finished compressing all jobs */
-            DEBUGLOG(2, "all jobs finished compressing\n");
+            DEBUG(2, "all jobs finished compressing\n");
             break;
         }
     }
@@ -299,16 +327,16 @@ static void* outputThread(void* arg)
     for ( ; ; ) {
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
-        DEBUGLOG(2, "outputThread(): waiting on job compressed\n");
+        DEBUG(2, "outputThread(): waiting on job compressed\n");
         pthread_mutex_lock(&ctx->jobCompressed_mutex);
         while (currJob + 1 > ctx->jobCompressedID) {
             ctx->stats.waitCompressed++;
             ctx->stats.compressedCounter++;
-            DEBUGLOG(2, "waiting on job compressed, nextJob: %u\n", currJob);
+            DEBUG(2, "waiting on job compressed, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobCompressed_cond, &ctx->jobCompressed_mutex);
         }
         pthread_mutex_unlock(&ctx->jobCompressed_mutex);
-        DEBUGLOG(2, "outputThread(): continuing after job compressed\n");
+        DEBUG(2, "outputThread(): continuing after job compressed\n");
         {
             size_t const compressedSize = job->compressedSize;
             if (ZSTD_isError(compressedSize)) {
@@ -325,19 +353,19 @@ static void* outputThread(void* arg)
                 }
             }
         }
-        DEBUGLOG(2, "finished job write %u\n", currJob);
+        DEBUG(2, "finished job write %u\n", currJob);
         currJob++;
         displayProgress(currJob, ctx->compressionLevel, job->lastJob);
-        DEBUGLOG(2, "locking job write mutex\n");
+        DEBUG(2, "locking job write mutex\n");
         pthread_mutex_lock(&ctx->jobWrite_mutex);
         ctx->jobWriteID++;
         pthread_cond_signal(&ctx->jobWrite_cond);
         pthread_mutex_unlock(&ctx->jobWrite_mutex);
-        DEBUGLOG(2, "unlocking job write mutex\n");
+        DEBUG(2, "unlocking job write mutex\n");
 
         if (job->lastJob || ctx->threadError) {
             /* finished with all jobs */
-            DEBUGLOG(2, "all jobs finished writing\n");
+            DEBUG(2, "all jobs finished writing\n");
             pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
             ctx->allJobsCompleted = 1;
             pthread_cond_signal(&ctx->allJobsCompleted_cond);
@@ -353,17 +381,17 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     unsigned const nextJob = ctx->nextJobID;
     unsigned const nextJobIndex = nextJob % ctx->numJobs;
     jobDescription* job = &ctx->jobs[nextJobIndex];
-    DEBUGLOG(2, "createCompressionJob(): wait for job write\n");
+    DEBUG(2, "createCompressionJob(): wait for job write\n");
     pthread_mutex_lock(&ctx->jobWrite_mutex);
-    DEBUGLOG(2, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
+    DEBUG(2, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
     while (nextJob - ctx->jobWriteID >= ctx->numJobs) {
         ctx->stats.waitWrite++;
         ctx->stats.writeCounter++;
-        DEBUGLOG(2, "waiting on job Write, nextJob: %u\n", nextJob);
+        DEBUG(2, "waiting on job Write, nextJob: %u\n", nextJob);
         pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex);
     }
     pthread_mutex_unlock(&ctx->jobWrite_mutex);
-    DEBUGLOG(2, "createCompressionJob(): continuing after job write\n");
+    DEBUG(2, "createCompressionJob(): continuing after job write\n");
 
 
     job->compressionLevel = ctx->compressionLevel;
@@ -371,13 +399,21 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     job->dst.size = ZSTD_compressBound(srcSize);
     job->jobID = nextJob;
     job->lastJob = last;
-    memcpy(job->src.start, ctx->input.buffer.start, srcSize);
+    memcpy(job->src.start, ctx->input.buffer.start + ctx->input.filled, srcSize);
+    job->dict.size = ctx->input.filled;
+    memcpy(job->dict.start, ctx->input.buffer.start, ctx->input.filled);
     pthread_mutex_lock(&ctx->jobReady_mutex);
     ctx->jobReadyID++;
     pthread_cond_signal(&ctx->jobReady_cond);
     pthread_mutex_unlock(&ctx->jobReady_mutex);
-    DEBUGLOG(2, "finished job creation %u\n", nextJob);
+    DEBUG(2, "finished job creation %u\n", nextJob);
     ctx->nextJobID++;
+
+    /* if not on the last job, reuse data as dictionary in next job */
+    if (!last) {
+        ctx->input.filled = srcSize;
+        memmove(ctx->input.buffer.start, ctx->input.buffer.start + ctx->input.filled, srcSize);
+    }
     return 0;
 }
 
@@ -447,7 +483,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst
 
     /* creating jobs */
     for ( ; ; ) {
-        size_t const readSize = fread(ctx->input.buffer.start, 1, FILE_CHUNK_SIZE, srcFile);
+        size_t const readSize = fread(ctx->input.buffer.start + ctx->input.filled, 1, FILE_CHUNK_SIZE, srcFile);
         if (readSize != FILE_CHUNK_SIZE && !feof(srcFile)) {
             DISPLAY("Error: problem occurred during read from src file\n");
             ctx->threadError = 1;
@@ -466,7 +502,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst
             }
         }
         if (feof(srcFile)) {
-            DEBUGLOG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
+            DEBUG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
             break;
         }
     }
@@ -563,7 +599,7 @@ int main(int argCount, const char* argv[])
                 case 'i':
                     argument += 2;
                     g_compressionLevel = readU32FromChar(&argument);
-                    DEBUGLOG(2, "g_compressionLevel: %u\n", g_compressionLevel);
+                    DEBUG(2, "g_compressionLevel: %u\n", g_compressionLevel);
                     break;
                 case 's':
                     g_displayStats = 1;