]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added priority decision making for adapt compression level
authorPaul Cruz <paulcruz74@fb.com>
Fri, 21 Jul 2017 16:26:35 +0000 (09:26 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Fri, 21 Jul 2017 16:26:35 +0000 (09:26 -0700)
contrib/adaptive-compression/adapt.c

index f25d22a4cbe4e146b6e70e248f2179d0a0170be4..758bf558999b400b3329d5faabb8eecf47c49837 100644 (file)
@@ -308,43 +308,42 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
  */
 static void adaptCompressionLevel(adaptCCtx* ctx)
 {
-    /* check if compression is too slow */
-    unsigned createChange;
-    unsigned writeChange;
-    unsigned compressionChange;
+    double createCompletion, compressionCompletion, writeCompletion;
+    double const threshold = 0.00001;
     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-    createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
-    writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
-    compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
-    DEBUG(2, "compression level %u\n", ctx->compressionLevel);
-    DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured);
-    DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
-    DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured);
+    createCompletion = ctx->createCompletionMeasured;
+    compressionCompletion = ctx->compressionCompletionMeasured;
+    writeCompletion = ctx->writeCompletionMeasured;
     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
-    {
-        unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel);
-
-        DEBUG(2, "compressionFastChange %u\n", compressionFastChange);
-
-        if (compressionFastChange) {
-            DEBUG(2, "compression level too low\n");
-            ctx->compressionLevel += compressionFastChange;
-        }
-        else {
-            unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1);
-            DEBUG(2, "compression level too high\n");
-            ctx->compressionLevel -= compressionSlowChange;
-        }
+    DEBUG(2, "create completion: %f\n", createCompletion);
+    DEBUG(2, "compression completion: %f\n", compressionCompletion);
+    DEBUG(2, "write completion: %f\n", writeCompletion);
+    /* adapt compression based on bottleneck */
+    if (1 - createCompletion > threshold) {
+        /* job creation was not finished, compression thread waited */
+        unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - createCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
+        DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change);
+        ctx->compressionLevel += change;
+    }
+    else if (1 - writeCompletion > threshold) {
+        /* write thread was not finished, compression thread waited */
+        unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - writeCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
+        DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change);
+        ctx->compressionLevel += change;
+    }
+    else if (1 - compressionCompletion > threshold) {
+        /* compression thread was not finished, one of the other two threads waited */
+        unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - compressionCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
+        DEBUG(2, "decreasing compression level %u by %u\n", ctx->compressionLevel, change);
+        ctx->compressionLevel -= change;
     }
-
     /* reset */
     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
     ctx->createCompletionMeasured = 1;
     ctx->compressionCompletionMeasured = 1;
     ctx->writeCompletionMeasured = 1;
     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
-    DEBUG(2, "\n");
 
     if (g_forceCompressionLevel) {
         ctx->compressionLevel = g_compressionLevel;
@@ -375,7 +374,7 @@ static void* compressionThread(void* arg)
             /* compression thread is waiting, take measurements of write completion and read completion */
             ctx->createCompletionMeasured = ctx->createCompletion;
             ctx->writeCompletionMeasured = ctx->writeCompletion;
-            DEBUG(2, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured);
+            DEBUG(3, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured);
             DEBUG(3, "create completion: %f\n", ctx->createCompletion);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
@@ -516,7 +515,7 @@ static void* outputThread(void* arg)
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
             /* write thread is waiting, take measurement of compression completion */
             ctx->compressionCompletionMeasured = ctx->compressionCompletion;
-            DEBUG(2, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
+            DEBUG(3, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
@@ -598,7 +597,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
         pthread_mutex_lock(&ctx->completion_mutex.pMutex);
         /* creation thread is waiting, take measurement of compression completion */
         ctx->compressionCompletionMeasured = ctx->compressionCompletion;
-        DEBUG(2, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured);
+        DEBUG(3, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured);
         DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
         pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
         DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);