]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
measure multiple completion levels during each wait
authorPaul Cruz <paulcruz74@fb.com>
Fri, 21 Jul 2017 20:38:24 +0000 (13:38 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Fri, 21 Jul 2017 20:38:24 +0000 (13:38 -0700)
contrib/adaptive-compression/adapt.c

index c542c9a3db051863ed2a086c8c83ca318350b358..b466dbdd8c1ba1178f7fbc9852b1b6a7ea730678 100644 (file)
@@ -25,7 +25,7 @@
 #define DEFAULT_DISPLAY_LEVEL 1
 #define DEFAULT_COMPRESSION_LEVEL 6
 #define DEFAULT_ADAPT_PARAM 0
-#define MAX_COMPRESSION_LEVEL_CHANGE 4
+#define MAX_COMPRESSION_LEVEL_CHANGE 3
 
 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
 static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
@@ -77,9 +77,12 @@ typedef struct {
     unsigned jobWriteID;
     unsigned allJobsCompleted;
     unsigned adaptParam;
-    double compressionCompletionMeasured;
-    double writeCompletionMeasured;
-    double createCompletionMeasured;
+    double createWaitWriteCompletion;
+    double createWaitCompressionCompletion;
+    double compressWaitCreateCompletion;
+    double compressWaitWriteCompletion;
+    double writeWaitCreateCompletion;
+    double writeWaitCompressionCompletion;
     double compressionCompletion;
     double writeCompletion;
     double createCompletion;
@@ -200,9 +203,14 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
     ctx->jobCompressedID = 0;
     ctx->jobWriteID = 0;
     ctx->lastDictSize = 0;
-    ctx->createCompletionMeasured = 1;
-    ctx->compressionCompletionMeasured = 1;
-    ctx->writeCompletionMeasured = 1;
+
+
+    ctx->createWaitWriteCompletion = 1;
+    ctx->createWaitCompressionCompletion = 1;
+    ctx->compressWaitCreateCompletion = 1;
+    ctx->compressWaitWriteCompletion = 1;
+    ctx->writeWaitCreateCompletion = 1;
+    ctx->writeWaitCompressionCompletion = 1;
 
     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
 
@@ -308,45 +316,61 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
  */
 static void adaptCompressionLevel(adaptCCtx* ctx)
 {
-    double createCompletion, compressionCompletion, writeCompletion;
+    double createWaitWriteCompletion;
+    double createWaitCompressionCompletion;
+    double compressWaitCreateCompletion;
+    double compressWaitWriteCompletion;
+    double writeWaitCreateCompletion;
+    double writeWaitCompressionCompletion;
     double const threshold = 0.00001;
+
+
+    /* read and reset completion measurements */
     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-    createCompletion = ctx->createCompletionMeasured;
-    compressionCompletion = ctx->compressionCompletionMeasured;
-    writeCompletion = ctx->writeCompletionMeasured;
+    DEBUG(2, "rc %f\n", ctx->createWaitCompressionCompletion);
+    DEBUG(2, "rw %f\n", ctx->createWaitWriteCompletion);
+    DEBUG(2, "cr %f\n", ctx->compressWaitCreateCompletion);
+    DEBUG(2, "cw %f\n", ctx->compressWaitWriteCompletion);
+    DEBUG(2, "wr %f\n", ctx->writeWaitCreateCompletion);
+    DEBUG(2, "wc %f\n\n", ctx->writeWaitCompressionCompletion);
+
+    createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
+    createWaitWriteCompletion = ctx->createWaitWriteCompletion;
+    compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
+    compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
+    writeWaitCreateCompletion = ctx->writeWaitCreateCompletion;
+    writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
+
+    ctx->createWaitWriteCompletion = 1;
+    ctx->createWaitCompressionCompletion = 1;
+    ctx->compressWaitCreateCompletion = 1;
+    ctx->compressWaitWriteCompletion = 1;
+    ctx->writeWaitCreateCompletion = 1;
+    ctx->writeWaitCompressionCompletion = 1;
     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
-    DEBUG(2, "create completion: %f\n", createCompletion);
-    DEBUG(2, "compression completion: %f\n", compressionCompletion);
-    DEBUG(2, "write completion: %f\n", writeCompletion);
-    /* adapt compression based on bottleneck */
-    if (1 - createCompletion > threshold) {
-        /* job creation was not finished, compression thread waited */
-        unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - createCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
+    /* adaptation logic */
+    if (1-createWaitCompressionCompletion > threshold && 1-writeWaitCompressionCompletion > threshold) {
+        /* both create and write threads waiting on compression */
+        /* use writeWaitCompressionCompletion */
+        unsigned const change = (unsigned)((1-writeWaitCompressionCompletion) * MAX_COMPRESSION_LEVEL_CHANGE);
+        unsigned const boundChange = MIN(change, ctx->compressionLevel - 1);
+        ctx->compressionLevel -= boundChange;
+    }
+    else if (1-createWaitWriteCompletion > threshold && 1-compressWaitWriteCompletion > threshold) {
+        /* both create and compression thread waiting on write */
+        /* use createWaitWriteCompletion */
+        unsigned const change = (unsigned)((1-createWaitWriteCompletion) * MAX_COMPRESSION_LEVEL_CHANGE);
         unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
-        DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change);
         ctx->compressionLevel += boundChange;
     }
-    else if (1 - writeCompletion > threshold) {
-        /* write thread was not finished, compression thread waited */
-        unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - writeCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
+    else if (1-writeWaitCreateCompletion > threshold && 1-compressWaitCreateCompletion > threshold) {
+        /* both compression and write waiting on create */
+        /* use compressWaitCreateCompletion */
+        unsigned const change = (unsigned)((1-compressWaitCreateCompletion) * MAX_COMPRESSION_LEVEL_CHANGE);
         unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel);
-        DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change);
         ctx->compressionLevel += boundChange;
     }
-    else if (1 - compressionCompletion > threshold) {
-        /* compression thread was not finished, one of the other two threads waited */
-        unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - compressionCompletion * MAX_COMPRESSION_LEVEL_CHANGE;
-        unsigned const boundChange = MIN(change, ctx->compressionLevel - 1);
-        DEBUG(2, "decreasing compression level %u by %u\n", ctx->compressionLevel, change);
-        ctx->compressionLevel -= boundChange;
-    }
-    /* reset */
-    pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-    ctx->createCompletionMeasured = 1;
-    ctx->compressionCompletionMeasured = 1;
-    ctx->writeCompletionMeasured = 1;
-    pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
     if (g_forceCompressionLevel) {
         ctx->compressionLevel = g_compressionLevel;
@@ -375,9 +399,9 @@ static void* compressionThread(void* arg)
         while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
             /* compression thread is waiting, take measurements of write completion and read completion */
-            ctx->createCompletionMeasured = ctx->createCompletion;
-            ctx->writeCompletionMeasured = ctx->writeCompletion;
-            DEBUG(3, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured);
+            ctx->compressWaitCreateCompletion = ctx->createCompletion;
+            ctx->compressWaitWriteCompletion = ctx->writeCompletion;
+            DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f : compressWaitWriteCompletion %f\n", ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion);
             DEBUG(3, "create completion: %f\n", ctx->createCompletion);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
@@ -462,7 +486,7 @@ static void* compressionThread(void* arg)
                     /* update completion */
                     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
                     ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
-                    DEBUG(3, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion);
+                    DEBUG(3, "compression completion %f\n", ctx->compressionCompletion);
                     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
                 }
             } while (remaining != 0);
@@ -517,8 +541,9 @@ static void* outputThread(void* arg)
         while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
             /* write thread is waiting, take measurement of compression completion */
-            ctx->compressionCompletionMeasured = ctx->compressionCompletion;
-            DEBUG(3, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
+            ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
+            ctx->writeWaitCreateCompletion = ctx->createCompletion;
+            DEBUG(3, "write thread waiting : writeWaitCreateCompletion %f : writeWaitCompressionCompletion %f\n", ctx->writeWaitCreateCompletion, ctx->writeWaitCompressionCompletion);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
@@ -541,7 +566,7 @@ static void* outputThread(void* arg)
             }
             {
                 // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
-                size_t const blockSize = MAX(compressedSize >> 7, 64 << 10);
+                size_t const blockSize = MAX(compressedSize >> 7, 1 << 10);
                 size_t pos = 0;
                 for ( ; ; ) {
                     size_t const writeSize = MIN(remaining, blockSize);
@@ -553,6 +578,7 @@ static void* outputThread(void* arg)
                     /* update completion variable for writing */
                     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
                     ctx->writeCompletion = 1 - (double)remaining/compressedSize;
+                    DEBUG(3, "write completion %f\n", ctx->writeCompletion);
                     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
                     if (remaining == 0) break;
@@ -599,8 +625,9 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
         pthread_mutex_lock(&ctx->completion_mutex.pMutex);
         /* creation thread is waiting, take measurement of compression completion */
-        ctx->compressionCompletionMeasured = ctx->compressionCompletion;
-        DEBUG(3, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured);
+        ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
+        ctx->createWaitWriteCompletion = ctx->writeCompletion;
+        DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
         DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
         pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
         DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);