]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
changed how completion is actually sampled
authorPaul Cruz <paulcruz74@fb.com>
Thu, 20 Jul 2017 17:53:51 +0000 (10:53 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Thu, 20 Jul 2017 17:53:51 +0000 (10:53 -0700)
contrib/adaptive-compression/adapt.c

index ce92d9140d1f7b8d73ab6c9c016f36d4a834d3c6..cf232ca7b26ea969dd7f360393da1f8d1845a3ef 100644 (file)
@@ -24,8 +24,8 @@
 #define MAX_PATH 256
 #define DEFAULT_DISPLAY_LEVEL 1
 #define DEFAULT_COMPRESSION_LEVEL 6
-#define DEFAULT_ADAPT_PARAM 1
-#define MAX_COMPRESSION_LEVEL_CHANGE 3
+#define DEFAULT_ADAPT_PARAM 0
+#define MAX_COMPRESSION_LEVEL_CHANGE 4
 
 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
 static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
@@ -87,9 +87,9 @@ typedef struct {
     unsigned jobWriteID;
     unsigned allJobsCompleted;
     unsigned adaptParam;
-    unsigned compressionCompletionMeasured;
-    unsigned writeCompletionMeasured;
-    unsigned createCompletionMeasured;
+    double compressionCompletionMeasured;
+    double writeCompletionMeasured;
+    double createCompletionMeasured;
     double compressionCompletion;
     double writeCompletion;
     double createCompletion;
@@ -342,6 +342,9 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
         writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
         createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
         pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
+        DEBUG(2, "createWaiting %u\n", createWaiting);
+        DEBUG(2, "compressWaiting %u\n", compressWaiting);
+        DEBUG(2, "writeWaiting %u\n\n", writeWaiting);
         {
             unsigned const writeSlow = (compressWaiting && createWaiting);
             unsigned const compressSlow = (writeWaiting && createWaiting);
@@ -351,14 +354,14 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
                 reset = 1;
             }
             else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
-                DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
+                DEBUG(2, "increasing compression level %u\n", ctx->compressionLevel);
                 double completion;
                 pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-                completion = writeSlow ? ctx->writeCompletion : ctx->createCompletion;
-                DEBUG(3, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion);
+                completion = writeSlow ? ctx->writeCompletionMeasured : ctx->createCompletionMeasured;
+                DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletionMeasured, ctx->createCompletionMeasured);
                 pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
                 {
-                    unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE));
+                    unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE);
                     unsigned const change = MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel);
                     DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change);
                     DEBUG(3, "write completion: %f\n", completion);
@@ -369,13 +372,13 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
             else if (compressSlow && ctx->compressionLevel > 1) {
                 double completion;
                 pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-                completion = ctx->compressionCompletion;
+                completion = ctx->compressionCompletionMeasured;
                 pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
                 {
-                    unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE));
+                    unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE);
                     unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
-                    DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
-                    DEBUG(3, "completion: %f\n", completion);
+                    DEBUG(2, "decreasing compression level %u\n", ctx->compressionLevel);
+                    DEBUG(2, "completion: %f\n", completion);
                     ctx->compressionLevel -= change;
                     reset = 1;
                 }
@@ -386,15 +389,6 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
                 ctx->stats.writeCounter = 0;
                 ctx->stats.compressedCounter = 0;
                 pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
-
-                pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-                ctx->compressionCompletion = 0;
-                ctx->compressionCompletionMeasured = 0;
-                ctx->writeCompletion = 0;
-                ctx->writeCompletionMeasured = 0;
-                ctx->createCompletion = 0;
-                ctx->createCompletionMeasured = 0;
-                pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             }
         }
     }
@@ -424,8 +418,8 @@ static void* compressionThread(void* arg)
             pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
             reduceCounters(ctx);
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            ctx->createCompletionMeasured = 1;
-            DEBUG(2, "create completion: %f\n", ctx->createCompletion);
+            ctx->createCompletionMeasured = ctx->createCompletion;
+            DEBUG(3, "create completion: %f\n", ctx->createCompletion);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
@@ -502,9 +496,8 @@ static void* compressionThread(void* arg)
 
                     /* update completion */
                     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-                    if (!ctx->compressionCompletionMeasured) {
-                        ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
-                    }
+                    ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
+                    DEBUG(2, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion);
                     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
                 }
             } while (remaining != 0);
@@ -562,8 +555,8 @@ static void* outputThread(void* arg)
             pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
             reduceCounters(ctx);
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            ctx->compressionCompletionMeasured = 1;
-            DEBUG(2, "compressionCompletion %f\n", ctx->compressionCompletion);
+            ctx->compressionCompletionMeasured = ctx->compressionCompletion;
+            DEBUG(2, "waited on job %u: compressionCompletion %f\n", currJob, ctx->compressionCompletion);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
@@ -580,7 +573,7 @@ static void* outputThread(void* arg)
             }
             {
                 // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
-                size_t const blockSize = 64 << 10; /* 64 KB */
+                size_t const blockSize = compressedSize >> 7;
                 size_t pos = 0;
                 for ( ; ; ) {
                     size_t const writeSize = MIN(remaining, blockSize);
@@ -591,9 +584,7 @@ static void* outputThread(void* arg)
 
                     /* update completion variable for writing */
                     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-                    if (!ctx->writeCompletionMeasured) {
-                        ctx->writeCompletion = 1 - (double)remaining/compressedSize;
-                    }
+                    ctx->writeCompletion = 1 - (double)remaining/compressedSize;
                     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
                     if (remaining == 0) break;
@@ -643,8 +634,8 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
         pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
         reduceCounters(ctx);
         pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-        ctx->writeCompletionMeasured = 1;
-        DEBUG(2, "writeCompletion: %f\n", ctx->writeCompletion);
+        ctx->writeCompletionMeasured = ctx->writeCompletion;
+        DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
         pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
         DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
         pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
@@ -736,9 +727,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
             pos += ret;
             remaining -= ret;
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            if (!ctx->createCompletionMeasured) {
-                ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
-            }
+            ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
             DEBUG(3, "create completion: %f\n", ctx->createCompletion);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
         }