]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
reworked adaptCompressionLevel to only account for completion information
authorPaul Cruz <paulcruz74@fb.com>
Thu, 20 Jul 2017 23:19:16 +0000 (16:19 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Thu, 20 Jul 2017 23:19:16 +0000 (16:19 -0700)
contrib/adaptive-compression/adapt.c

index cf232ca7b26ea969dd7f360393da1f8d1845a3ef..84e689a20b272f7990965c664378bcb4f402b80a 100644 (file)
@@ -29,7 +29,6 @@
 
 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
 static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
-static unsigned g_displayStats = 0;
 static UTIL_time_t g_startTime;
 static size_t g_streamedSize = 0;
 static unsigned g_useProgressBar = 0;
@@ -47,15 +46,6 @@ typedef struct {
     buffer_t buffer;
 } inBuff_t;
 
-typedef struct {
-    unsigned waitCompressed;
-    unsigned waitReady;
-    unsigned waitWrite;
-    unsigned readyCounter;
-    unsigned compressedCounter;
-    unsigned writeCounter;
-} cStat_t;
-
 typedef struct {
     buffer_t src;
     buffer_t dst;
@@ -102,10 +92,9 @@ typedef struct {
     mutex_t jobWrite_mutex;
     cond_t jobWrite_cond;
     mutex_t completion_mutex;
-    mutex_t stats_mutex;
+    mutex_t wait_mutex;
     size_t lastDictSize;
     inBuff_t input;
-    cStat_t stats;
     jobDescription* jobs;
     ZSTD_CCtx* cctx;
 } adaptCCtx;
@@ -163,7 +152,7 @@ static int freeCCtx(adaptCCtx* ctx)
         error |= destroyMutex(&ctx->jobWrite_mutex);
         error |= destroyCond(&ctx->jobWrite_cond);
         error |= destroyMutex(&ctx->completion_mutex);
-        error |= destroyMutex(&ctx->stats_mutex);
+        error |= destroyMutex(&ctx->wait_mutex);
         error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
         free(ctx->input.buffer.start);
         if (ctx->jobs){
@@ -203,7 +192,7 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
         pthreadError |= initMutex(&ctx->jobWrite_mutex);
         pthreadError |= initCond(&ctx->jobWrite_cond);
         pthreadError |= initMutex(&ctx->completion_mutex);
-        pthreadError |= initMutex(&ctx->stats_mutex);
+        pthreadError |= initMutex(&ctx->wait_mutex);
         if (pthreadError) return pthreadError;
     }
     ctx->numJobs = numJobs;
@@ -211,6 +200,10 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
     ctx->jobCompressedID = 0;
     ctx->jobWriteID = 0;
     ctx->lastDictSize = 0;
+    ctx->createCompletionMeasured = 1;
+    ctx->compressionCompletionMeasured = 1;
+    ctx->writeCompletionMeasured = 1;
+
     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
 
     if (!ctx->jobs) {
@@ -305,17 +298,6 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
     pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
 }
 
-/* this function normalizes counters when compression level is changing */
-static void reduceCounters(adaptCCtx* ctx)
-{
-    pthread_mutex_lock(&ctx->stats_mutex.pMutex);
-    unsigned const min = MIN(ctx->stats.compressedCounter, MIN(ctx->stats.writeCounter, ctx->stats.readyCounter));
-    ctx->stats.writeCounter -= min;
-    ctx->stats.compressedCounter -= min;
-    ctx->stats.readyCounter -= min;
-    pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
-}
-
 /*
  * Compression level is changed depending on which part of the compression process is lagging
  * Currently, three theads exist for job creation, compression, and file writing respectively.
@@ -330,67 +312,42 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
         ctx->compressionLevel = g_compressionLevel;
     }
     else {
-        unsigned reset = 0;
-        unsigned allSlow;
-        unsigned compressWaiting;
-        unsigned writeWaiting;
-        unsigned createWaiting;
-
-        pthread_mutex_lock(&ctx->stats_mutex.pMutex);
-        allSlow = ctx->adaptParam < ctx->stats.compressedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter;
-        compressWaiting = ctx->adaptParam < ctx->stats.readyCounter;
-        writeWaiting = ctx->adaptParam < ctx->stats.compressedCounter;
-        createWaiting = ctx->adaptParam < ctx->stats.writeCounter;
-        pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
-        DEBUG(2, "createWaiting %u\n", createWaiting);
-        DEBUG(2, "compressWaiting %u\n", compressWaiting);
-        DEBUG(2, "writeWaiting %u\n\n", writeWaiting);
+        DEBUG(2, "compression level %u\n", ctx->compressionLevel);
+        /* check if compression is too slow */
+        unsigned createChange;
+        unsigned writeChange;
+        unsigned compressionChange;
+        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+        createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
+        writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
+        compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE;
+        DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured);
+        DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
+        DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured);
+        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
         {
-            unsigned const writeSlow = (compressWaiting && createWaiting);
-            unsigned const compressSlow = (writeWaiting && createWaiting);
-            unsigned const createSlow = (compressWaiting && writeWaiting);
-            DEBUG(3, "createWaiting: %u, compressWaiting: %u, writeWaiting: %u\n", createWaiting, compressWaiting, writeWaiting);
-            if (allSlow) {
-                reset = 1;
-            }
-            else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
-                DEBUG(2, "increasing compression level %u\n", ctx->compressionLevel);
-                double completion;
-                pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-                completion = writeSlow ? ctx->writeCompletionMeasured : ctx->createCompletionMeasured;
-                DEBUG(2, "write completion: %f, create completion: %f\n", ctx->writeCompletionMeasured, ctx->createCompletionMeasured);
-                pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
-                {
-                    unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE);
-                    unsigned const change = MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel);
-                    DEBUG(3, "writeSlow: %u, change: %u\n", writeSlow, change);
-                    DEBUG(3, "write completion: %f\n", completion);
-                    ctx->compressionLevel += change;
-                    reset = 1;
-                }
-            }
-            else if (compressSlow && ctx->compressionLevel > 1) {
-                double completion;
-                pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-                completion = ctx->compressionCompletionMeasured;
-                pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
-                {
-                    unsigned const maxChange = MAX_COMPRESSION_LEVEL_CHANGE - (unsigned)(completion*MAX_COMPRESSION_LEVEL_CHANGE);
-                    unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
-                    DEBUG(2, "decreasing compression level %u\n", ctx->compressionLevel);
-                    DEBUG(2, "completion: %f\n", completion);
-                    ctx->compressionLevel -= change;
-                    reset = 1;
-                }
+            unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel);
+            DEBUG(2, "compressionFastChange %u\n", compressionFastChange);
+
+            if (compressionFastChange) {
+                DEBUG(2, "compression level too low\n");
+                ctx->compressionLevel += compressionFastChange;
             }
-            if (reset) {
-                pthread_mutex_lock(&ctx->stats_mutex.pMutex);
-                ctx->stats.readyCounter = 0;
-                ctx->stats.writeCounter = 0;
-                ctx->stats.compressedCounter = 0;
-                pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
+            else {
+                unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1);
+                DEBUG(2, "compression level too high\n");
+                ctx->compressionLevel -= compressionSlowChange;
             }
         }
+
+        /* reset */
+        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+        ctx->createCompletionMeasured = 1;
+        ctx->compressionCompletionMeasured = 1;
+        ctx->writeCompletionMeasured = 1;
+        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+        DEBUG(2, "\n");
     }
 }
 
@@ -410,21 +367,31 @@ static void* compressionThread(void* arg)
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
         DEBUG(3, "compressionThread(): waiting on job ready\n");
+
+        /* new job, reset completion */
+        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+        ctx->compressionCompletion = 0;
+        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
         pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
         while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
-            pthread_mutex_lock(&ctx->stats_mutex.pMutex);
-            ctx->stats.waitReady++;
-            ctx->stats.readyCounter++;
-            pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
-            reduceCounters(ctx);
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+            /* compression thread is waiting, take measurements of write completion and read completion */
             ctx->createCompletionMeasured = ctx->createCompletion;
+            ctx->writeCompletionMeasured = ctx->writeCompletion;
+            DEBUG(2, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured);
             DEBUG(3, "create completion: %f\n", ctx->createCompletion);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
         }
         pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
+        /* reset create completion */
+        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+        ctx->createCompletion = 0;
+        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
         DEBUG(3, "compressionThread(): continuing after job ready\n");
         DEBUG(3, "DICTIONARY ENDED\n");
         DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start);
@@ -497,7 +464,7 @@ static void* compressionThread(void* arg)
                     /* update completion */
                     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
                     ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
-                    DEBUG(2, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion);
+                    DEBUG(3, "update on job %u: compression completion %f\n", currJob, ctx->compressionCompletion);
                     pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
                 }
             } while (remaining != 0);
@@ -547,21 +514,29 @@ static void* outputThread(void* arg)
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
         DEBUG(3, "outputThread(): waiting on job compressed\n");
+
+        /* new job, reset completion */
+        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+        ctx->writeCompletion = 0;
+        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
         while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
-            pthread_mutex_lock(&ctx->stats_mutex.pMutex);
-            ctx->stats.waitCompressed++;
-            ctx->stats.compressedCounter++;
-            pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
-            reduceCounters(ctx);
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+            /* write thread is waiting, take measurement of compression completion */
             ctx->compressionCompletionMeasured = ctx->compressionCompletion;
-            DEBUG(2, "waited on job %u: compressionCompletion %f\n", currJob, ctx->compressionCompletion);
+            DEBUG(2, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
         }
         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
+
+        /* reset compression completion */
+        pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+        ctx->compressionCompletion = 0;
+        pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
         DEBUG(3, "outputThread(): continuing after job compressed\n");
         {
             size_t const compressedSize = job->compressedSize;
@@ -615,6 +590,7 @@ static void* outputThread(void* arg)
             pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
             break;
         }
+
     }
     return arg;
 }
@@ -628,19 +604,21 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
     DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
     while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
-        pthread_mutex_lock(&ctx->stats_mutex.pMutex);
-        ctx->stats.waitWrite++;
-        ctx->stats.writeCounter++;
-        pthread_mutex_unlock(&ctx->stats_mutex.pMutex);
-        reduceCounters(ctx);
         pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-        ctx->writeCompletionMeasured = ctx->writeCompletion;
+        /* creation thread is waiting, take measurement of compression completion */
+        ctx->compressionCompletionMeasured = ctx->compressionCompletion;
+        DEBUG(2, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured);
         DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
         pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
         DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
         pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
     }
     pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
+
+    /* reset write completion */
+    pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+    ctx->writeCompletion = 0;
+    pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
     DEBUG(3, "createCompressionJob(): continuing after job write\n");
 
     DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
@@ -677,14 +655,6 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     return 0;
 }
 
-static void printStats(cStat_t stats)
-{
-    DISPLAY("========STATISTICS========\n");
-    DISPLAY("# times waited on job ready: %u\n", stats.waitReady);
-    DISPLAY("# times waited on job compressed: %u\n", stats.waitCompressed);
-    DISPLAY("# times waited on job Write: %u\n\n", stats.waitWrite);
-}
-
 static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg)
 {
     if (!ctx || !srcFile || !otArg) {
@@ -710,48 +680,56 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA
             return 1;
         }
     }
+    {
+        unsigned currJob = 0;
+        /* creating jobs */
+        for ( ; ; ) {
+            size_t pos = 0;
+            size_t const readBlockSize = 1 << 15;
+            size_t remaining = FILE_CHUNK_SIZE;
+
+            /* new job reset completion */
+            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+            ctx->createCompletion = 0;
+            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
 
-    /* creating jobs */
-    for ( ; ; ) {
-        size_t pos = 0;
-        size_t const readBlockSize = 1 << 15;
-        size_t remaining = FILE_CHUNK_SIZE;
-        while (remaining != 0 && !feof(srcFile)) {
-            size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
-            if (ret != readBlockSize && !feof(srcFile)) {
-                /* error could not read correct number of bytes */
+            while (remaining != 0 && !feof(srcFile)) {
+                size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
+                if (ret != readBlockSize && !feof(srcFile)) {
+                    /* error could not read correct number of bytes */
+                    DISPLAY("Error: problem occurred during read from src file\n");
+                    signalErrorToThreads(ctx);
+                    return 1;
+                }
+                pos += ret;
+                remaining -= ret;
+                pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+                ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
+                DEBUG(3, "create completion: %f\n", ctx->createCompletion);
+                pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+            }
+            if (remaining != 0 && !feof(srcFile)) {
                 DISPLAY("Error: problem occurred during read from src file\n");
                 signalErrorToThreads(ctx);
                 return 1;
             }
-            pos += ret;
-            remaining -= ret;
-            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
-            DEBUG(3, "create completion: %f\n", ctx->createCompletion);
-            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
-        }
-        if (remaining != 0 && !feof(srcFile)) {
-            DISPLAY("Error: problem occurred during read from src file\n");
-            signalErrorToThreads(ctx);
-            return 1;
-        }
-        g_streamedSize += pos;
-        /* reading was fine, now create the compression job */
-        {
-            int const last = feof(srcFile);
-            int const error = createCompressionJob(ctx, pos, last);
-            if (error != 0) {
-                signalErrorToThreads(ctx);
-                return error;
+            g_streamedSize += pos;
+            /* reading was fine, now create the compression job */
+            {
+                int const last = feof(srcFile);
+                int const error = createCompressionJob(ctx, pos, last);
+                if (error != 0) {
+                    signalErrorToThreads(ctx);
+                    return error;
+                }
+            }
+            currJob++;
+            if (feof(srcFile)) {
+                DEBUG(3, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
+                break;
             }
-        }
-        if (feof(srcFile)) {
-            DEBUG(3, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
-            break;
         }
     }
-
     /* success -- created all jobs */
     return 0;
 }
@@ -803,9 +781,6 @@ static int freeFileCompressionResources(fcResources* fcr)
 {
     int ret = 0;
     waitUntilAllJobsCompleted(fcr->ctx);
-    pthread_mutex_lock(&fcr->ctx->stats_mutex.pMutex);
-    if (g_displayStats) printStats(fcr->ctx->stats);
-    pthread_mutex_unlock(&fcr->ctx->stats_mutex.pMutex);
     ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
     ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
     if (fcr->otArg) {
@@ -873,7 +848,6 @@ static void help()
     PRINT("  -oFILE : specify the output file name\n");
     PRINT("  -v     : display debug information\n");
     PRINT("  -i#    : provide initial compression level\n");
-    PRINT("  -s     : display information stats\n");
     PRINT("  -h     : display help/information\n");
     PRINT("  -f     : force the compression level to stay constant\n");
 }
@@ -913,9 +887,6 @@ int main(int argCount, const char* argv[])
                     g_compressionLevel = readU32FromChar(&argument);
                     DEBUG(3, "g_compressionLevel: %u\n", g_compressionLevel);
                     break;
-                case 's':
-                    g_displayStats = 1;
-                    break;
                 case 'h':
                     help();
                     goto _main_exit;