]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
reduced competition for completion mutex by separating mutex use based on which value...
authorPaul Cruz <paulcruz74@fb.com>
Sun, 23 Jul 2017 21:09:16 +0000 (14:09 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Sun, 23 Jul 2017 21:09:16 +0000 (14:09 -0700)
contrib/adaptive-compression/adapt.c

index 248421457a9bf88617d5449aad26beee65040632..4e5a86c72123108feaec36ba8946ff1c29e73233 100644 (file)
@@ -92,8 +92,9 @@ typedef struct {
     cond_t allJobsCompleted_cond;
     mutex_t jobWrite_mutex;
     cond_t jobWrite_cond;
-    mutex_t completion_mutex;
-    mutex_t wait_mutex;
+    mutex_t compressionCompletion_mutex;
+    mutex_t createCompletion_mutex;
+    mutex_t writeCompletion_mutex;
     size_t lastDictSize;
     inBuff_t input;
     jobDescription* jobs;
@@ -152,8 +153,9 @@ static int freeCCtx(adaptCCtx* ctx)
         error |= destroyCond(&ctx->allJobsCompleted_cond);
         error |= destroyMutex(&ctx->jobWrite_mutex);
         error |= destroyCond(&ctx->jobWrite_cond);
-        error |= destroyMutex(&ctx->completion_mutex);
-        error |= destroyMutex(&ctx->wait_mutex);
+        error |= destroyMutex(&ctx->compressionCompletion_mutex);
+        error |= destroyMutex(&ctx->createCompletion_mutex);
+        error |= destroyMutex(&ctx->writeCompletion_mutex);
         error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
         free(ctx->input.buffer.start);
         if (ctx->jobs){
@@ -192,8 +194,9 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
         pthreadError |= initCond(&ctx->allJobsCompleted_cond);
         pthreadError |= initMutex(&ctx->jobWrite_mutex);
         pthreadError |= initCond(&ctx->jobWrite_cond);
-        pthreadError |= initMutex(&ctx->completion_mutex);
-        pthreadError |= initMutex(&ctx->wait_mutex);
+        pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
+        pthreadError |= initMutex(&ctx->createCompletion_mutex);
+        pthreadError |= initMutex(&ctx->writeCompletion_mutex);
         if (pthreadError) return pthreadError;
     }
     ctx->numJobs = numJobs;
@@ -323,28 +326,32 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
 
     DEBUG(2, "adapting compression level %u\n", ctx->compressionLevel);
     /* read and reset completion measurements */
-    pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+
+    pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
     DEBUG(2, "rc %f\n", ctx->createWaitCompressionCompletion);
-    DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion);
-    DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion);
     DEBUG(2, "wc %f\n\n", ctx->writeWaitCompressionCompletion);
-
     createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
-    compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
-    compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
     writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
-
-    DEBUG(2, "resetting adaptive variables\n");
     ctx->createWaitCompressionCompletion = 1;
-    ctx->compressWaitCreateCompletion = 1;
-    ctx->compressWaitWriteCompletion = 1;
     ctx->writeWaitCompressionCompletion = 1;
-    pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+    pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
+
+    pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
+    DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion);
+    compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
+    ctx->compressWaitWriteCompletion = 1;
+    pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
+
+    pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
+    DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion);
+    compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
+    ctx->compressWaitCreateCompletion = 1;
+    pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
 
     /* adaptation logic */
-    if (1-createWaitCompressionCompletion > threshold && 1-writeWaitCompressionCompletion > threshold) {
-        /* both create and write threads waiting on compression */
-        /* use writeWaitCompressionCompletion */
+    if (1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) {
+        /* compression waiting on either create or write */
+        /* use whichever one waited less because it was slower */
         double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
         unsigned const change = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
         unsigned const boundChange = MIN(change, ctx->compressionLevel - 1);
@@ -404,15 +411,21 @@ static void* compressionThread(void* arg)
             if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1;
             pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
 
-            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+
             if (willWaitForCreate || willWaitForWrite) {
-                ctx->compressWaitCreateCompletion = ctx->createCompletion;
-                ctx->compressWaitWriteCompletion = ctx->writeCompletion;
                 DEBUG(2, "compression will wait for create or write\n");
+
+                pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
+                ctx->compressWaitCreateCompletion = ctx->createCompletion;
                 DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
+                pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
+
+                pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
+                ctx->compressWaitWriteCompletion = ctx->writeCompletion;
                 DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
+                pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
             }
-            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
         }
 
         /* wait until job is ready */
@@ -429,9 +442,9 @@ static void* compressionThread(void* arg)
         }
         pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
         /* reset compression completion */
-        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+        pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
         ctx->compressionCompletion = 0;
-        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+        pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
 
         DEBUG(3, "compressionThread(): continuing after job ready\n");
         DEBUG(3, "DICTIONARY ENDED\n");
@@ -502,10 +515,10 @@ static void* compressionThread(void* arg)
                     blockNum++;
 
                     /* update completion */
-                    pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+                    pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
                     ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
                     DEBUG(2, "compression completion %u %f\n", currJob, ctx->compressionCompletion);
-                    pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+                    pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
                 }
             } while (remaining != 0);
             job->dst.size = job->compressedSize;
@@ -557,20 +570,20 @@ static void* outputThread(void* arg)
         DEBUG(3, "outputThread(): waiting on job compressed\n");
         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
         while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
-            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+            pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
             /* write thread is waiting on compression thread */
             ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
             DEBUG(3, "write thread waiting : writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
             DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
-            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+            pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
         }
         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
 
         /* reset write completion */
-        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+        pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
         ctx->writeCompletion = 0;
-        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+        pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
 
         DEBUG(3, "outputThread(): continuing after job compressed\n");
         {
@@ -593,10 +606,10 @@ static void* outputThread(void* arg)
                     remaining -= ret;
 
                     /* update completion variable for writing */
-                    pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+                    pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
                     ctx->writeCompletion = 1 - (double)remaining/compressedSize;
                     DEBUG(2, "write completion %u %f\n", currJob, ctx->writeCompletion);
-                    pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+                    pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
 
                     if (remaining == 0) break;
                 }
@@ -642,20 +655,20 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     /* wait until the job has been compressed */
     pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
     while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
-        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+        pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
         /* creation thread is waiting, take measurement of completion */
         ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
         DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion);
         DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
         DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
-        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+        pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
         pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
     }
     pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
     /* reset create completion */
-    pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+    pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
     ctx->createCompletion = 0;
-    pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+    pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
     DEBUG(3, "createCompressionJob(): continuing after job write\n");
 
     DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
@@ -735,10 +748,10 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
                 }
                 pos += ret;
                 remaining -= ret;
-                pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+                pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
                 ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
                 DEBUG(2, "create completion %u %f\n", currJob, ctx->createCompletion);
-                pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+                pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
             }
             if (remaining != 0 && !feof(srcFile)) {
                 DISPLAY("Error: problem occurred during read from src file\n");