]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added mechanism for measuring how much of a job has been created
authorPaul Cruz <paulcruz74@fb.com>
Wed, 19 Jul 2017 17:10:47 +0000 (10:10 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Wed, 19 Jul 2017 17:10:47 +0000 (10:10 -0700)
contrib/adaptive-compression/adapt.c

index d3ffe6a8109e83a83c9bdccff495b50454366171..de2e5e1397ac50d5389b3929ba1ac532bc3b02a6 100644 (file)
@@ -89,8 +89,10 @@ typedef struct {
     unsigned adaptParam;
     unsigned compressionCompletionMeasured;
     unsigned writeCompletionMeasured;
+    unsigned createCompletionMeasured;
     double compressionCompletion;
     double writeCompletion;
+    double createCompletion;
     mutex_t jobCompressed_mutex;
     cond_t jobCompressed_cond;
     mutex_t jobReady_mutex;
@@ -344,7 +346,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
             unsigned const writeSlow = (compressWaiting && createWaiting);
             unsigned const compressSlow = (writeWaiting && createWaiting);
             unsigned const createSlow = (compressWaiting && writeWaiting);
-            DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
+            DEBUG(3, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
             if (allSlow) {
                 reset = 1;
             }
@@ -352,13 +354,14 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
                 DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
                 double completion;
                 pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-                completion = ctx->writeCompletion;
+                completion = writeSlow ? ctx->writeCompletion : ctx->createCompletion;
+                DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion);
                 pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
                 {
                     unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1;
                     unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1;
-                    DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change);
-                    DEBUG(2, "write completion: %f\n", completion);
+                    DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change);
+                    DEBUG(3, "write completion: %f\n", completion);
                     ctx->compressionLevel += change;
                     reset = 1;
                 }
@@ -418,6 +421,9 @@ static void* compressionThread(void* arg)
             ctx->stats.readyCounter++;
             pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
             reduceCounters(ctx);
+            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+            ctx->createCompletionMeasured = 1;
+            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             adaptCompressionLevel(ctx);
             DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
@@ -695,6 +701,12 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
             }
             pos += ret;
             remaining -= ret;
+            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+            if (!ctx->createCompletionMeasured) {
+                ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
+            }
+            DEBUG(3, "create completion: %f\n", ctx->createCompletion);
+            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
         }
         if (remaining != 0 && !feof(srcFile)) {
             DISPLAY("Error: problem occurred during read from src file\n");