]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
make adaptCompressionLevel oscillate less
authorPaul Cruz <paulcruz74@fb.com>
Wed, 19 Jul 2017 23:36:33 +0000 (16:36 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Wed, 19 Jul 2017 23:36:33 +0000 (16:36 -0700)
contrib/adaptive-compression/adapt.c

index cc72d1558800c7c3f71a5364879d58da1a054ea8..ce92d9140d1f7b8d73ab6c9c016f36d4a834d3c6 100644 (file)
@@ -358,8 +358,8 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
                 DEBUG(3, "write completion: %f, create completion: %f\n", ctx->writeCompletion, ctx->createCompletion);
                 pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
                 {
-                    unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1;
-                    unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1;
+                    unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE));
+                    unsigned const change = MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel);
                     DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change);
                     DEBUG(3, "write completion: %f\n", completion);
                     ctx->compressionLevel += change;
@@ -372,7 +372,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
                 completion = ctx->compressionCompletion;
                 pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
                 {
-                    unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
+                    unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE));
                     unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
                     DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
                     DEBUG(3, "completion: %f\n", completion);
@@ -388,10 +388,12 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
                 pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
 
                 pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-                ctx->compressionCompletion = 1;
+                ctx->compressionCompletion = 0;
                 ctx->compressionCompletionMeasured = 0;
-                ctx->writeCompletion = 1;
+                ctx->writeCompletion = 0;
                 ctx->writeCompletionMeasured = 0;
+                ctx->createCompletion = 0;
+                ctx->createCompletionMeasured = 0;
                 pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             }
         }
@@ -423,6 +425,7 @@ static void* compressionThread(void* arg)
             reduceCounters(ctx);
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
             ctx->createCompletionMeasured = 1;
+            DEBUG(2, "create completion: %f\n", ctx->createCompletion);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
@@ -465,8 +468,8 @@ static void* compressionThread(void* arg)
 
             do {
                 size_t const actualBlockSize = MIN(remaining, compressionBlockSize);
-                DEBUG(2, "remaining: %zu\n", remaining);
-                DEBUG(2, "actualBlockSize: %zu\n", actualBlockSize);
+                DEBUG(3, "remaining: %zu\n", remaining);
+                DEBUG(3, "actualBlockSize: %zu\n", actualBlockSize);
 
                 /* continue compression */
                 if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */
@@ -480,9 +483,9 @@ static void* compressionThread(void* arg)
                     ZSTD_invalidateRepCodes(ctx->cctx);
                 }
                 {
-                    DEBUG(2, "write out ending: %d\n", job->lastJob && (remaining == actualBlockSize));
-                    DEBUG(2, "lastJob %u\n", job->lastJob);
-                    DEBUG(2, "compressionBlockSize %zu\n", compressionBlockSize);
+                    DEBUG(3, "write out ending: %d\n", job->lastJob && (remaining == actualBlockSize));
+                    DEBUG(3, "lastJob %u\n", job->lastJob);
+                    DEBUG(3, "compressionBlockSize %zu\n", compressionBlockSize);
                     size_t const ret = (job->lastJob && remaining == actualBlockSize) ?
                                             ZSTD_compressEnd     (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
                                             ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
@@ -560,6 +563,7 @@ static void* outputThread(void* arg)
             reduceCounters(ctx);
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
             ctx->compressionCompletionMeasured = 1;
+            DEBUG(2, "compressionCompletion %f\n", ctx->compressionCompletion);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
@@ -576,7 +580,7 @@ static void* outputThread(void* arg)
             }
             {
                 // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
-                size_t const blockSize = 4 << 20;
+                size_t const blockSize = 64 << 10; /* 64 KB */
                 size_t pos = 0;
                 for ( ; ; ) {
                     size_t const writeSize = MIN(remaining, blockSize);
@@ -640,6 +644,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
         reduceCounters(ctx);
         pthread_mutex_lock(&ctx->completion_mutex.pMutex);
         ctx->writeCompletionMeasured = 1;
+        DEBUG(2, "writeCompletion: %f\n", ctx->writeCompletion);
         pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
         DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
         pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);