]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
fix leaky abstraction regarding measuring completion
authorPaul Cruz <paulcruz74@fb.com>
Wed, 26 Jul 2017 23:40:05 +0000 (16:40 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Wed, 26 Jul 2017 23:40:05 +0000 (16:40 -0700)
contrib/adaptive-compression/adapt.c

index 05da8b7f731594c0cdea21692726e63a65f6abff..a16ab40cc91619f754152e2c58bfd6925e3fc5f3 100644 (file)
@@ -367,20 +367,16 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
     DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
     createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
     writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
-    ctx->createWaitCompressionCompletion = 1;
-    ctx->writeWaitCompressionCompletion = 1;
     pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
 
     pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
     DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion);
     compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
-    ctx->compressWaitWriteCompletion = 1;
     pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
 
     pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
     DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
     compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
-    ctx->compressWaitCreateCompletion = 1;
     pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
     DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter);
 
@@ -462,22 +458,28 @@ static void* compressionThread(void* arg)
             pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
 
 
+            pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
             if (willWaitForCreate) {
                 DEBUG(2, "compression will wait for create on job %u\n", currJob);
-                pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
                 ctx->compressWaitCreateCompletion = ctx->createCompletion;
                 DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
-                pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
 
             }
+            else {
+                ctx->compressWaitCreateCompletion = 1;
+            }
+            pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
 
+            pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
             if (willWaitForWrite) {
                 DEBUG(2, "compression will wait for write on job %u\n", currJob);
-                pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
                 ctx->compressWaitWriteCompletion = ctx->writeCompletion;
                 DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
-                pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
             }
+            else {
+                ctx->compressWaitWriteCompletion = 1;
+            }
+            pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
 
         }
 
@@ -610,14 +612,27 @@ static void* outputThread(void* arg)
     for ( ; ; ) {
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* const job = &ctx->jobs[currJobIndex];
+        unsigned willWaitForCompress = 0;
         DEBUG(2, "starting write for job %u\n", currJob);
+
         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
-        while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
-            pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
+        if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1;
+        pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
+
+
+        pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
+        if (willWaitForCompress) {
             /* write thread is waiting on compression thread */
             ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
             DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
-            pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+        }
+        else {
+            ctx->writeWaitCompressionCompletion = 1;
+        }
+        pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+
+        pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+        while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
         }
         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
@@ -751,17 +766,27 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
             size_t const readBlockSize = 1 << 15;
             size_t remaining = FILE_CHUNK_SIZE;
             unsigned const nextJob = ctx->nextJobID;
+            unsigned willWaitForCompress = 0;
             DEBUG(2, "starting creation of job %u\n", currJob);
 
-
-            /* wait until the job has been compressed */
             pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
-            while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
-                pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
+            if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1;
+            pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
+
+            pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
+            if (willWaitForCompress) {
                 /* creation thread is waiting, take measurement of completion */
                 ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
                 DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
-                pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+            }
+            else {
+                ctx->createWaitCompressionCompletion = 1;
+            }
+            pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+
+            /* wait until the job has been compressed */
+            pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+            while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
                 pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
             }
             pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);