]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
change parameters for compression level adapt
authorPaul Cruz <paulcruz74@fb.com>
Tue, 18 Jul 2017 00:59:50 +0000 (17:59 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Tue, 18 Jul 2017 00:59:50 +0000 (17:59 -0700)
contrib/adaptive-compression/adapt.c

index 01a73a669a7a40b9fed69dc3add78b5c41874d90..62f5ec912f92f6246a8c1633a70cebfc8316ff90 100644 (file)
@@ -25,7 +25,7 @@
 #define DEFAULT_DISPLAY_LEVEL 1
 #define DEFAULT_COMPRESSION_LEVEL 6
 #define DEFAULT_ADAPT_PARAM 1
-#define MAX_COMPRESSION_LEVEL_CHANGE 10
+#define MAX_COMPRESSION_LEVEL_CHANGE 3
 
 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
 static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
@@ -277,6 +277,15 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
     pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
 }
 
+/* this function normalizes counters when compression level is changing */
+static void reduceCounters(adaptCCtx* ctx)
+{
+    unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter));
+    ctx->stats.writeCounter -= min;
+    ctx->stats.compressedCounter -= min;
+    ctx->stats.readyCounter -= min;
+}
+
 /*
  * Compression level is changed depending on which part of the compression process is lagging
  * Currently, three theads exist for job creation, compression, and file writing respectively.
@@ -285,10 +294,10 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
  * compression thread lag           => decreased compression level
  * detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait
  */
-static unsigned adaptCompressionLevel(adaptCCtx* ctx)
+static void adaptCompressionLevel(adaptCCtx* ctx)
 {
     if (g_forceCompressionLevel) {
-        return g_compressionLevel;
+        ctx->compressionLevel = g_compressionLevel;
     }
     else {
         unsigned reset = 0;
@@ -296,10 +305,11 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx)
         unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
         unsigned const writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
         unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
-        unsigned const writeSlow = ((compressWaiting && createWaiting) || (createWaiting && !writeWaiting));
-        unsigned const compressSlow = ((writeWaiting && createWaiting) || (writeWaiting && !compressWaiting));
-        unsigned const createSlow = ((compressWaiting && writeWaiting) || (compressWaiting && !createWaiting));
-        DEBUG(3, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter);
+        unsigned const writeSlow = (compressWaiting && createWaiting);
+        unsigned const compressSlow = (writeWaiting && createWaiting);
+        unsigned const createSlow = (compressWaiting && writeWaiting);
+        DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
+        DEBUG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter);
         if (allSlow) {
             reset = 1;
         }
@@ -310,10 +320,10 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx)
         }
         else if (compressSlow && ctx->compressionLevel > 1) {
             double const completion = ctx->completion;
-            unsigned const maxChange = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
+            unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
             unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
             DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
-            DEBUG(2, "completion: %f\n", completion);
+            DEBUG(3, "completion: %f\n", completion);
             ctx->compressionLevel -= change;
             reset = 1;
         }
@@ -324,7 +334,6 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx)
             ctx->completion = 1;
             ctx->completionMeasured = 0;
         }
-        return ctx->compressionLevel;
     }
 }
 
@@ -348,6 +357,8 @@ static void* compressionThread(void* arg)
         while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
             ctx->stats.waitReady++;
             ctx->stats.readyCounter++;
+            reduceCounters(ctx);
+            adaptCompressionLevel(ctx);
             DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
         }
@@ -357,13 +368,13 @@ static void* compressionThread(void* arg)
         DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start);
         /* compress the data */
         {
-            unsigned const cLevel = adaptCompressionLevel(ctx);
+            unsigned const cLevel = ctx->compressionLevel;
             DEBUG(3, "cLevel used: %u\n", cLevel);
             DEBUG(3, "compression level used: %u\n", cLevel);
             /* begin compression */
             {
                 size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
-                DEBUG(2, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize);
+                DEBUG(3, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize);
                 size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1);
                 size_t const initError = ZSTD_compressBegin_usingDict(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, cLevel);
                 size_t const windowSizeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceWindow, 1);
@@ -443,11 +454,13 @@ static void* outputThread(void* arg)
         while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
             ctx->stats.waitCompressed++;
             ctx->stats.compressedCounter++;
+            reduceCounters(ctx);
             if (!ctx->completionMeasured) {
                 ctx->completion = ZSTD_getCompletion(ctx->cctx);
                 ctx->completionMeasured = 1;
             }
-            DEBUG(2, "output detected completion: %f\n", ctx->completion);
+            adaptCompressionLevel(ctx);
+            DEBUG(3, "output detected completion: %f\n", ctx->completion);
             DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
         }
@@ -503,11 +516,13 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
         ctx->stats.waitWrite++;
         ctx->stats.writeCounter++;
+        reduceCounters(ctx);
         if (!ctx->completionMeasured) {
             ctx->completion = ZSTD_getCompletion(ctx->cctx);
             ctx->completionMeasured = 1;
         }
-        DEBUG(2, "job creation detected completion %f\n", ctx->completion);
+        adaptCompressionLevel(ctx);
+        DEBUG(3, "job creation detected completion %f\n", ctx->completion);
         DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
         pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
     }