]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
change how completion is measured in compression thread
authorPaul Cruz <paulcruz74@fb.com>
Sun, 23 Jul 2017 17:18:54 +0000 (10:18 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Sun, 23 Jul 2017 17:18:54 +0000 (10:18 -0700)
contrib/adaptive-compression/adapt.c

index 68727cbd8aeebe327486d967213b50c4acb0e88b..248421457a9bf88617d5449aad26beee65040632 100644 (file)
@@ -25,7 +25,7 @@
 #define DEFAULT_DISPLAY_LEVEL 1
 #define DEFAULT_COMPRESSION_LEVEL 6
 #define DEFAULT_ADAPT_PARAM 0
-#define MAX_COMPRESSION_LEVEL_CHANGE 3
+#define MAX_COMPRESSION_LEVEL_CHANGE 2
 
 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
 static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
@@ -207,6 +207,9 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
     ctx->compressWaitCreateCompletion = 1;
     ctx->compressWaitWriteCompletion = 1;
     ctx->writeWaitCompressionCompletion = 1;
+    ctx->createCompletion = 1;
+    ctx->writeCompletion = 1;
+    ctx->compressionCompletion = 1;
 
     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
 
@@ -387,16 +390,34 @@ static void* compressionThread(void* arg)
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
         DEBUG(3, "compressionThread(): waiting on job ready\n");
+
+        {
+            /* check if compression thread will have to wait */
+            unsigned willWaitForCreate = 0;
+            unsigned willWaitForWrite = 0;
+
+            pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
+            if (currJob + 1 > ctx->jobReadyID) willWaitForCreate = 1;
+            pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
+            pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
+            if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1;
+            pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
+
+            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+            if (willWaitForCreate || willWaitForWrite) {
+                ctx->compressWaitCreateCompletion = ctx->createCompletion;
+                ctx->compressWaitWriteCompletion = ctx->writeCompletion;
+                DEBUG(2, "compression will wait for create or write\n");
+                DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
+                DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
+            }
+            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+        }
+
         /* wait until job is ready */
         pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
         while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
-            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            /* compression thread is waiting on creation thread, take measurement */
-            ctx->compressWaitCreateCompletion = ctx->createCompletion;
-            DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
-            DEBUG(3, "create completion: %f\n", ctx->createCompletion);
-            DEBUG(2, "compression thread waiting for ready: %u, compressWaitCreateCompletion %f\n", currJob, ctx->compressWaitCreateCompletion);
-            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
         }
         pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
@@ -404,11 +425,6 @@ static void* compressionThread(void* arg)
         /* wait until job previously in this space is written */
         pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
         while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
-            /* compression thread is waiting on writer thread, take measurement */
-            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            ctx->compressWaitWriteCompletion = ctx->writeCompletion;
-            DEBUG(2, "compression thread waiting for write: %u, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitWriteCompletion);
-            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
         }
         pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
@@ -488,7 +504,7 @@ static void* compressionThread(void* arg)
                     /* update completion */
                     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
                     ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
-                    DEBUG(2, "compression completion %f\n", ctx->compressionCompletion);
+                    DEBUG(2, "compression completion %u %f\n", currJob, ctx->compressionCompletion);
                     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
                 }
             } while (remaining != 0);
@@ -579,7 +595,7 @@ static void* outputThread(void* arg)
                     /* update completion variable for writing */
                     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
                     ctx->writeCompletion = 1 - (double)remaining/compressedSize;
-                    DEBUG(2, "write completion %f\n", ctx->writeCompletion);
+                    DEBUG(2, "write completion %u %f\n", currJob, ctx->writeCompletion);
                     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
                     if (remaining == 0) break;
@@ -721,7 +737,7 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
                 remaining -= ret;
                 pthread_mutex_lock(&ctx->completion_mutex.pMutex);
                 ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
-                DEBUG(2, "create completion: %f\n", ctx->createCompletion);
+                DEBUG(2, "create completion %u %f\n", currJob, ctx->createCompletion);
                 pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             }
             if (remaining != 0 && !feof(srcFile)) {