]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
semi working version that stabilizes
authorPaul Cruz <paulcruz74@fb.com>
Fri, 21 Jul 2017 01:45:33 +0000 (18:45 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Fri, 21 Jul 2017 01:45:33 +0000 (18:45 -0700)
contrib/adaptive-compression/adapt.c

index 3c6b7e904cdd4ffd144ef04f081e56b9f346bc9a..f25d22a4cbe4e146b6e70e248f2179d0a0170be4 100644 (file)
@@ -308,46 +308,46 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
  */
 static void adaptCompressionLevel(adaptCCtx* ctx)
 {
-    if (g_forceCompressionLevel) {
-        ctx->compressionLevel = g_compressionLevel;
-    }
-    else {
-        DEBUG(2, "compression level %u\n", ctx->compressionLevel);
-        /* check if compression is too slow */
-        unsigned createChange;
-        unsigned writeChange;
-        unsigned compressionChange;
-        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-        createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
-        writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
-        compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
-        DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured);
-        DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
-        DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured);
-        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+    /* check if compression is too slow */
+    unsigned createChange;
+    unsigned writeChange;
+    unsigned compressionChange;
+    pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+    createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
+    writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
+    compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
+    DEBUG(2, "compression level %u\n", ctx->compressionLevel);
+    DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured);
+    DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
+    DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured);
+    pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
-        {
-            unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel);
-            DEBUG(2, "compressionFastChange %u\n", compressionFastChange);
+    {
+        unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel);
 
-            if (compressionFastChange) {
-                DEBUG(2, "compression level too low\n");
-                ctx->compressionLevel += compressionFastChange;
-            }
-            else {
-                unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1);
-                DEBUG(2, "compression level too high\n");
-                ctx->compressionLevel -= compressionSlowChange;
-            }
+        DEBUG(2, "compressionFastChange %u\n", compressionFastChange);
+
+        if (compressionFastChange) {
+            DEBUG(2, "compression level too low\n");
+            ctx->compressionLevel += compressionFastChange;
         }
+        else {
+            unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1);
+            DEBUG(2, "compression level too high\n");
+            ctx->compressionLevel -= compressionSlowChange;
+        }
+    }
 
-        /* reset */
-        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-        ctx->createCompletionMeasured = 1;
-        ctx->compressionCompletionMeasured = 1;
-        ctx->writeCompletionMeasured = 1;
-        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
-        DEBUG(2, "\n");
+    /* reset */
+    pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+    ctx->createCompletionMeasured = 1;
+    ctx->compressionCompletionMeasured = 1;
+    ctx->writeCompletionMeasured = 1;
+    pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+    DEBUG(2, "\n");
+
+    if (g_forceCompressionLevel) {
+        ctx->compressionLevel = g_compressionLevel;
     }
 }
 
@@ -368,10 +368,6 @@ static void* compressionThread(void* arg)
         jobDescription* job = &ctx->jobs[currJobIndex];
         DEBUG(3, "compressionThread(): waiting on job ready\n");
 
-        /* new job, reset completion */
-        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-        ctx->compressionCompletion = 0;
-        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
         pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
         while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
@@ -387,9 +383,9 @@ static void* compressionThread(void* arg)
         }
         pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
 
-        /* reset create completion */
+        /* reset compression completion */
         pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-        ctx->createCompletion = 0;
+        ctx->compressionCompletion = 0;
         pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
         DEBUG(3, "compressionThread(): continuing after job ready\n");
@@ -397,7 +393,7 @@ static void* compressionThread(void* arg)
         DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start);
 
         /* adapt compression level */
-        adaptCompressionLevel(ctx);
+        if (currJob) adaptCompressionLevel(ctx);
 
         /* compress the data */
         {
@@ -515,11 +511,6 @@ static void* outputThread(void* arg)
         jobDescription* job = &ctx->jobs[currJobIndex];
         DEBUG(3, "outputThread(): waiting on job compressed\n");
 
-        /* new job, reset completion */
-        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-        ctx->writeCompletion = 0;
-        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
-
         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
         while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
@@ -532,9 +523,9 @@ static void* outputThread(void* arg)
         }
         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
 
-        /* reset compression completion */
+        /* reset write completion */
         pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-        ctx->compressionCompletion = 0;
+        ctx->writeCompletion = 0;
         pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
         DEBUG(3, "outputThread(): continuing after job compressed\n");
@@ -615,9 +606,9 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     }
     pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
 
-    /* reset write completion */
+    /* reset create completion */
     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-    ctx->writeCompletion = 0;
+    ctx->createCompletion = 0;
     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
     DEBUG(3, "createCompressionJob(): continuing after job write\n");
 
@@ -688,11 +679,6 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
             size_t const readBlockSize = 1 << 15;
             size_t remaining = FILE_CHUNK_SIZE;
 
-            /* new job reset completion */
-            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            ctx->createCompletion = 0;
-            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
-
             while (remaining != 0 && !feof(srcFile)) {
                 size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
                 if (ret != readBlockSize && !feof(srcFile)) {