]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added mutex for stats struct
authorPaul Cruz <paulcruz74@fb.com>
Tue, 18 Jul 2017 22:55:58 +0000 (15:55 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Tue, 18 Jul 2017 22:55:58 +0000 (15:55 -0700)
contrib/adaptive-compression/adapt.c

index b7f4dccf2655715b8aaf43762e5761dda030cb10..35e26abb74833a4c73ec1df5051643bb74f49ca5 100644 (file)
@@ -288,10 +288,12 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
 /* this function normalizes counters when compression level is changing */
 static void reduceCounters(adaptCCtx* ctx)
 {
+    pthread_mutex_lock(&ctx->stats_mutex.pMutex);
     unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter));
     ctx->stats.writeCounter -= min;
     ctx->stats.compressedCounter -= min;
     ctx->stats.readyCounter -= min;
+    pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
 }
 
 /*
@@ -309,58 +311,68 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
     }
     else {
         unsigned reset = 0;
-        unsigned const allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter;
-        unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
-        unsigned const writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
-        unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
-        unsigned const writeSlow = (compressWaiting && createWaiting);
-        unsigned const compressSlow = (writeWaiting && createWaiting);
-        unsigned const createSlow = (compressWaiting && writeWaiting);
-        DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
-        DEBUG(2, "ready: %u compressed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.compressedCounter, ctx->stats.writeCounter);
-        if (allSlow) {
-            reset = 1;
-        }
-        else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
-            DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
-            double completion;
-            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            completion = ctx->writeCompletion;
-            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
-            {
-                unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1;
-                unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1;
-                DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change);
-                DEBUG(2, "write completion: %f\n", completion);
-                ctx->compressionLevel += change;
+        unsigned allSlow;
+        unsigned compressWaiting;
+        unsigned writeWaiting;
+        unsigned createWaiting;
+
+        pthread_mutex_lock(&ctx->stats_mutex.pMutex);
+        allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter;
+        compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
+        writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
+        createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
+        pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
+        {
+            unsigned const writeSlow = (compressWaiting && createWaiting);
+            unsigned const compressSlow = (writeWaiting && createWaiting);
+            unsigned const createSlow = (compressWaiting && writeWaiting);
+            DEBUG(2, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
+            if (allSlow) {
                 reset = 1;
             }
-        }
-        else if (compressSlow && ctx->compressionLevel > 1) {
-            double completion;
-            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            completion = ctx->compressionCompletion;
-            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
-            {
-                unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
-                unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
-                DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
-                DEBUG(3, "completion: %f\n", completion);
-                ctx->compressionLevel -= change;
-                reset = 1;
+            else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
+                DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
+                double completion;
+                pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+                completion = ctx->writeCompletion;
+                pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+                {
+                    unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1;
+                    unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1;
+                    DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change);
+                    DEBUG(2, "write completion: %f\n", completion);
+                    ctx->compressionLevel += change;
+                    reset = 1;
+                }
+            }
+            else if (compressSlow && ctx->compressionLevel > 1) {
+                double completion;
+                pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+                completion = ctx->compressionCompletion;
+                pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+                {
+                    unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
+                    unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
+                    DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
+                    DEBUG(3, "completion: %f\n", completion);
+                    ctx->compressionLevel -= change;
+                    reset = 1;
+                }
+            }
+            if (reset) {
+                pthread_mutex_lock(&ctx->stats_mutex.pMutex);
+                ctx->stats.readyCounter = 0;
+                ctx->stats.writeCounter = 0;
+                ctx->stats.compressedCounter = 0;
+                pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
+
+                pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+                ctx->compressionCompletion = 1;
+                ctx->compressionCompletionMeasured = 0;
+                ctx->writeCompletion = 1;
+                ctx->writeCompletionMeasured = 0;
+                pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             }
-        }
-        if (reset) {
-            ctx->stats.readyCounter = 0;
-            ctx->stats.writeCounter = 0;
-            ctx->stats.compressedCounter = 0;
-
-            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            ctx->compressionCompletion = 1;
-            ctx->compressionCompletionMeasured = 0;
-            ctx->writeCompletion = 1;
-            ctx->writeCompletionMeasured = 0;
-            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
         }
     }
 }
@@ -383,8 +395,10 @@ static void* compressionThread(void* arg)
         DEBUG(3, "compressionThread(): waiting on job ready\n");
         pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
         while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
+            pthread_mutex_lock(&ctx->stats_mutex.pMutex);
             ctx->stats.waitReady++;
             ctx->stats.readyCounter++;
+            pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
             reduceCounters(ctx);
             adaptCompressionLevel(ctx);
             DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
@@ -480,8 +494,10 @@ static void* outputThread(void* arg)
         DEBUG(3, "outputThread(): waiting on job compressed\n");
         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
         while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
+            pthread_mutex_lock(&ctx->stats_mutex.pMutex);
             ctx->stats.waitCompressed++;
             ctx->stats.compressedCounter++;
+            pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
             reduceCounters(ctx);
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
             if (!ctx->compressionCompletionMeasured) {
@@ -563,8 +579,10 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
     DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
     while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
+        pthread_mutex_lock(&ctx->stats_mutex.pMutex);
         ctx->stats.waitWrite++;
         ctx->stats.writeCounter++;
+        pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
         reduceCounters(ctx);
         pthread_mutex_lock(&ctx->completion_mutex.pMutex);
         ctx->writeCompletionMeasured = 1;
@@ -719,7 +737,9 @@ static int freeFileCompressionResources(fcResources* fcr)
 {
     int ret = 0;
     waitUntilAllJobsCompleted(fcr->ctx);
+    pthread_mutex_lock(&fcr->ctx->stats_mutex.pMutex);
     if (g_displayStats) printStats(fcr->ctx->stats);
+    pthread_mutex_unlock(&fcr->ctx->stats_mutex.pMutex);
     ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
     ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
     if (fcr->otArg) {