]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
decrease completion requirements for change, move create thread wait, merge cases...
authorPaul Cruz <paulcruz74@fb.com>
Wed, 26 Jul 2017 17:05:10 +0000 (10:05 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Wed, 26 Jul 2017 17:05:10 +0000 (10:05 -0700)
contrib/adaptive-compression/adapt.c

index 16701e75dad2cb8a54f92a1a85fdb592f5bc96ce..1633b6bf8db84ba18b8ad2e8875d24a630653fea 100644 (file)
@@ -322,10 +322,10 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
 /* map completion percentages to values for changing compression level */
 static unsigned convertCompletionToChange(double completion)
 {
-    if (completion < 0.05) {
+    if (completion < 0.1) {
         return 2;
     }
-    else if (completion < 0.5) {
+    else if (completion < 0.65) {
         return 1;
     }
     else {
@@ -396,9 +396,9 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
 
         DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange);
     }
-    else if (1-compressWaitWriteCompletion > threshold) {
+    else if (1-compressWaitWriteCompletion > threshold || 1-compressWaitCreateCompletion > threshold) {
         /* compress waiting on write */
-        double const completion = compressWaitWriteCompletion;
+        double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion);
         unsigned const change = convertCompletionToChange(completion);
         unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
         if (ctx->convergenceCounter > CONVERGENCE_LOWER_BOUND && boundChange != 0) {
@@ -406,26 +406,11 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
         }
         else if (boundChange != 0) {
             ctx->compressionLevel += boundChange;
+            ctx->cooldown = 0;
             ctx->convergenceCounter = 1;
         }
 
-        DEBUG(2, "compress waiting on write, tried to increase compression level by %u\n\n", boundChange);
-    }
-    else if (1-compressWaitCreateCompletion > threshold) {
-        /* compress waiting on create*/
-        /* use compressWaitCreateCompletion */
-        double const completion = compressWaitCreateCompletion;
-        unsigned const change = convertCompletionToChange(completion);
-        unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
-        if (ctx->convergenceCounter > CONVERGENCE_LOWER_BOUND && boundChange != 0) {
-            ctx->convergenceCounter = 0;
-        }
-        else if (boundChange != 0) {
-            ctx->compressionLevel += boundChange;
-            ctx->convergenceCounter = 1;
-        }
-
-        DEBUG(2, "compression waiting on create, tried to increase compression level by %u\n\n", boundChange);
+        DEBUG(2, "compress waiting on write or create, tried to increase compression level by %u\n\n", boundChange);
     }
 
     if (ctx->compressionLevel == prevCompressionLevel) {
@@ -689,17 +674,6 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     jobDescription* const job = &ctx->jobs[nextJobIndex];
 
 
-    /* wait until the job has been compressed */
-    pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
-    while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
-        pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
-        /* creation thread is waiting, take measurement of completion */
-        ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
-        DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
-        pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
-        pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
-    }
-    pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
     /* reset create completion */
     pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
     ctx->createCompletion = 0;
@@ -767,8 +741,22 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
             size_t pos = 0;
             size_t const readBlockSize = 1 << 15;
             size_t remaining = FILE_CHUNK_SIZE;
-
+            unsigned const nextJob = ctx->nextJobID;
             DEBUG(2, "starting creation of job %u\n", currJob);
+
+
+            /* wait until the job has been compressed */
+            pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+            while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
+                pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
+                /* creation thread is waiting, take measurement of completion */
+                ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
+                DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
+                pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+                pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
+            }
+            pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
+
             while (remaining != 0 && !feof(srcFile)) {
                 size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
                 if (ret != readBlockSize && !feof(srcFile)) {