]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added progress check for filewriting, put important shared data behind mutex when...
authorPaul Cruz <paulcruz74@fb.com>
Tue, 18 Jul 2017 22:23:11 +0000 (15:23 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Tue, 18 Jul 2017 22:23:11 +0000 (15:23 -0700)
contrib/adaptive-compression/adapt.c

index 404db6d94923b9d21964e8bb0f94945c3cbbcb74..b7f4dccf2655715b8aaf43762e5761dda030cb10 100644 (file)
@@ -88,7 +88,9 @@ typedef struct {
     unsigned allJobsCompleted;
     unsigned adaptParam;
     unsigned compressionCompletionMeasured;
+    unsigned writeCompletionMeasured;
     double compressionCompletion;
+    double writeCompletion;
     mutex_t jobCompressed_mutex;
     cond_t jobCompressed_cond;
     mutex_t jobReady_mutex;
@@ -97,6 +99,8 @@ typedef struct {
     cond_t allJobsCompleted_cond;
     mutex_t jobWrite_mutex;
     cond_t jobWrite_cond;
+    mutex_t completion_mutex;
+    mutex_t stats_mutex;
     size_t lastDictSize;
     inBuff_t input;
     cStat_t stats;
@@ -156,6 +160,8 @@ static int freeCCtx(adaptCCtx* ctx)
         error |= destroyCond(&ctx->allJobsCompleted_cond);
         error |= destroyMutex(&ctx->jobWrite_mutex);
         error |= destroyCond(&ctx->jobWrite_cond);
+        error |= destroyMutex(&ctx->completion_mutex);
+        error |= destroyMutex(&ctx->stats_mutex);
         error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
         free(ctx->input.buffer.start);
         if (ctx->jobs){
@@ -200,6 +206,8 @@ static adaptCCtx* createCCtx(unsigned numJobs)
         pthreadError |= initCond(&ctx->allJobsCompleted_cond);
         pthreadError |= initMutex(&ctx->jobWrite_mutex);
         pthreadError |= initCond(&ctx->jobWrite_cond);
+        pthreadError |= initMutex(&ctx->completion_mutex);
+        pthreadError |= initMutex(&ctx->stats_mutex);
         if (pthreadError) return NULL;
     }
     ctx->numJobs = numJobs;
@@ -315,24 +323,44 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
         }
         else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
             DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
-            ctx->compressionLevel++;
-            reset = 1;
+            double completion;
+            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+            completion = ctx->writeCompletion;
+            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+            {
+                unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1;
+                unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1;
+                DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change);
+                DEBUG(2, "write completion: %f\n", completion);
+                ctx->compressionLevel += change;
+                reset = 1;
+            }
         }
         else if (compressSlow && ctx->compressionLevel > 1) {
-            double const completion = ctx->compressionCompletion;
-            unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
-            unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
-            DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
-            DEBUG(3, "completion: %f\n", completion);
-            ctx->compressionLevel -= change;
-            reset = 1;
+            double completion;
+            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+            completion = ctx->compressionCompletion;
+            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+            {
+                unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
+                unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
+                DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
+                DEBUG(3, "completion: %f\n", completion);
+                ctx->compressionLevel -= change;
+                reset = 1;
+            }
         }
         if (reset) {
             ctx->stats.readyCounter = 0;
             ctx->stats.writeCounter = 0;
             ctx->stats.compressedCounter = 0;
+
+            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
             ctx->compressionCompletion = 1;
             ctx->compressionCompletionMeasured = 0;
+            ctx->writeCompletion = 1;
+            ctx->writeCompletionMeasured = 0;
+            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
         }
     }
 }
@@ -455,12 +483,14 @@ static void* outputThread(void* arg)
             ctx->stats.waitCompressed++;
             ctx->stats.compressedCounter++;
             reduceCounters(ctx);
+            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
             if (!ctx->compressionCompletionMeasured) {
                 ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
                 ctx->compressionCompletionMeasured = 1;
+                DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion);
             }
+            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             adaptCompressionLevel(ctx);
-            DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion);
             DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
         }
@@ -484,6 +514,14 @@ static void* outputThread(void* arg)
                     if (ret != writeSize) break;
                     pos += ret;
                     remaining -= ret;
+
+                    /* update completion variable for writing */
+                    pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+                    if (!ctx->writeCompletionMeasured) {
+                        ctx->writeCompletion = 1 - (double)remaining/compressedSize;
+                    }
+                    pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
                     if (remaining == 0) break;
                 }
                 if (pos != compressedSize) {
@@ -528,12 +566,10 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
         ctx->stats.waitWrite++;
         ctx->stats.writeCounter++;
         reduceCounters(ctx);
-        if (!ctx->compressionCompletion) {
-            ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
-            ctx->compressionCompletionMeasured = 1;
-        }
+        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+        ctx->writeCompletionMeasured = 1;
+        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
         adaptCompressionLevel(ctx);
-        DEBUG(3, "job creation detected completion %f\n", ctx->compressionCompletion);
         DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
         pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
     }
@@ -552,10 +588,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
         ctx->input.buffer.start = copy;
     }
     job->dictSize = ctx->lastDictSize;
-    pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
-    ctx->jobReadyID++;
-    pthread_cond_signal(&ctx->jobReady_cond.pCond);
-    pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
     DEBUG(3, "finished job creation %u\n", nextJob);
     ctx->nextJobID++;
     DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
@@ -567,6 +600,13 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
         ctx->lastDictSize = srcSize;
         ctx->input.filled = srcSize;
     }
+
+    /* signal job ready */
+    pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
+    ctx->jobReadyID++;
+    pthread_cond_signal(&ctx->jobReady_cond.pCond);
+    pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
     return 0;
 }