]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added mutex for compression level to avoid data race
authorPaul Cruz <paulcruz74@fb.com>
Wed, 2 Aug 2017 17:27:33 +0000 (10:27 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Wed, 2 Aug 2017 17:27:33 +0000 (10:27 -0700)
contrib/adaptive-compression/adapt.c

index eef6c93266411373d5ff5b2353e7529b4f5df760..7cbf2c99a4ef2fc3a6b7e47ba4e0a275ab66ce4b 100644 (file)
@@ -59,7 +59,6 @@ typedef struct {
 typedef struct {
     buffer_t src;
     buffer_t dst;
-    unsigned compressionLevel;
     unsigned jobID;
     unsigned lastJobPlusOne;
     size_t compressedSize;
@@ -78,7 +77,6 @@ typedef struct {
 
 typedef struct {
     unsigned compressionLevel;
-    unsigned numActiveThreads;
     unsigned numJobs;
     unsigned nextJobID;
     unsigned threadError;
@@ -141,6 +139,7 @@ typedef struct {
     mutex_t compressionCompletion_mutex;
     mutex_t createCompletion_mutex;
     mutex_t writeCompletion_mutex;
+    mutex_t compressionLevel_mutex;
     size_t lastDictSize;
     inBuff_t input;
     jobDescription* jobs;
@@ -202,6 +201,7 @@ static int freeCCtx(adaptCCtx* ctx)
         error |= destroyMutex(&ctx->compressionCompletion_mutex);
         error |= destroyMutex(&ctx->createCompletion_mutex);
         error |= destroyMutex(&ctx->writeCompletion_mutex);
+        error |= destroyMutex(&ctx->compressionLevel_mutex);
         error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
         free(ctx->input.buffer.start);
         if (ctx->jobs){
@@ -243,6 +243,7 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
         pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
         pthreadError |= initMutex(&ctx->createCompletion_mutex);
         pthreadError |= initMutex(&ctx->writeCompletion_mutex);
+        pthreadError |= initMutex(&ctx->compressionLevel_mutex);
         if (pthreadError) return pthreadError;
     }
     ctx->numJobs = numJobs;
@@ -384,16 +385,22 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
     double compressWaitWriteCompletion;
     double writeWaitCompressionCompletion;
     double const threshold = 0.00001;
-    unsigned const prevCompressionLevel = ctx->compressionLevel;
+    unsigned prevCompressionLevel;
+
+    pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+    prevCompressionLevel = ctx->compressionLevel;
+    pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
 
 
     if (g_forceCompressionLevel) {
+        pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
         ctx->compressionLevel = g_compressionLevel;
+        pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
         return;
     }
 
 
-    DEBUG(2, "adapting compression level %u\n", ctx->compressionLevel);
+    DEBUG(2, "adapting compression level %u\n", prevCompressionLevel);
 
     /* read and reset completion measurements */
     pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
@@ -414,7 +421,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
     pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
     DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter);
 
-    assert(g_minCLevel <= ctx->compressionLevel && g_maxCLevel >= ctx->compressionLevel);
+    assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel);
 
     /* adaptation logic */
     if (ctx->cooldown) ctx->cooldown--;
@@ -424,14 +431,16 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
         /* use whichever one waited less because it was slower */
         double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
         unsigned const change = convertCompletionToChange(completion);
-        unsigned const boundChange = MIN(change, ctx->compressionLevel - g_minCLevel);
+        unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel);
         if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
             /* reset convergence counter, might have been a spike */
             ctx->convergenceCounter = 0;
             DEBUG(2, "convergence counter reset, no change applied\n");
         }
         else if (boundChange != 0) {
+            pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
             ctx->compressionLevel -= boundChange;
+            pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
             ctx->cooldown = CLEVEL_DECREASE_COOLDOWN;
             ctx->convergenceCounter = 1;
 
@@ -442,14 +451,16 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
         /* compress waiting on write */
         double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion);
         unsigned const change = convertCompletionToChange(completion);
-        unsigned const boundChange = MIN(change, g_maxCLevel - ctx->compressionLevel);
+        unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel);
         if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
             /* reset convergence counter, might have been a spike */
             ctx->convergenceCounter = 0;
             DEBUG(2, "convergence counter reset, no change applied\n");
         }
         else if (boundChange != 0) {
+            pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
             ctx->compressionLevel += boundChange;
+            pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
             ctx->cooldown = 0;
             ctx->convergenceCounter = 1;
 
@@ -458,9 +469,11 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
 
     }
 
+    pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
     if (ctx->compressionLevel == prevCompressionLevel) {
         ctx->convergenceCounter++;
     }
+    pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
 }
 
 static size_t getUseableDictSize(unsigned compressionLevel)
@@ -540,15 +553,23 @@ static void* compressionThread(void* arg)
         /* adapt compression level */
         if (currJob) adaptCompressionLevel(ctx);
 
+        pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
         DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel);
+        pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+
         /* compress the data */
         {
             size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */
-            unsigned const cLevel = ctx->compressionLevel;
+            unsigned cLevel;
             unsigned blockNum = 0;
             size_t remaining = job->src.size;
             size_t srcPos = 0;
             size_t dstPos = 0;
+
+            pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+            cLevel = ctx->compressionLevel;
+            pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+
             /* reset compressed size */
             job->compressedSize = 0;
             DEBUG(2, "calling ZSTD_compressBegin()\n");
@@ -712,7 +733,13 @@ static void* outputThread(void* arg)
                 }
             }
         }
-        displayProgress(ctx->compressionLevel, job->lastJobPlusOne == currJob + 1);
+        {
+            unsigned cLevel;
+            pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
+            cLevel = ctx->compressionLevel;
+            pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
+            displayProgress(cLevel, job->lastJobPlusOne == currJob + 1);
+        }
         pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
         ctx->jobWriteID++;
         pthread_cond_signal(&ctx->jobWrite_cond.pCond);
@@ -740,7 +767,6 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     jobDescription* const job = &ctx->jobs[nextJobIndex];
 
 
-    job->compressionLevel = ctx->compressionLevel;
     job->src.size = srcSize;
     job->jobID = nextJob;
     if (last) job->lastJobPlusOne = nextJob + 1;