]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added error detection for pthread initialization, added compression completion measur...
authorPaul Cruz <paulcruz74@fb.com>
Mon, 17 Jul 2017 17:12:44 +0000 (10:12 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Mon, 17 Jul 2017 17:12:44 +0000 (10:12 -0700)
contrib/adaptive-compression/adapt.c
lib/compress/zstd_compress.c
lib/zstd.h

index addd0c59d827112640dc96994b5c63a208b88516..d45bdf85b97af8fd446e9e895c8c36469598109a 100644 (file)
@@ -25,6 +25,7 @@
 #define DEFAULT_DISPLAY_LEVEL 1
 #define DEFAULT_COMPRESSION_LEVEL 6
 #define DEFAULT_ADAPT_PARAM 1
+#define MAX_COMPRESSION_LEVEL_CHANGE 10
 
 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
 static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
@@ -65,6 +66,16 @@ typedef struct {
     size_t dictSize;
 } jobDescription;
 
+typedef struct {
+    pthread_mutex_t pMutex;
+    int noError;
+} mutex_t;
+
+typedef struct {
+    pthread_cond_t pCond;
+    int noError;
+} cond_t;
+
 typedef struct {
     unsigned compressionLevel;
     unsigned numActiveThreads;
@@ -76,14 +87,16 @@ typedef struct {
     unsigned jobWriteID;
     unsigned allJobsCompleted;
     unsigned adaptParam;
-    pthread_mutex_t jobCompressed_mutex;
-    pthread_cond_t jobCompressed_cond;
-    pthread_mutex_t jobReady_mutex;
-    pthread_cond_t jobReady_cond;
-    pthread_mutex_t allJobsCompleted_mutex;
-    pthread_cond_t allJobsCompleted_cond;
-    pthread_mutex_t jobWrite_mutex;
-    pthread_cond_t jobWrite_cond;
+    unsigned completionMeasured;
+    double completion;
+    mutex_t jobCompressed_mutex;
+    cond_t jobCompressed_cond;
+    mutex_t jobReady_mutex;
+    cond_t jobReady_cond;
+    mutex_t allJobsCompleted_mutex;
+    cond_t allJobsCompleted_cond;
+    mutex_t jobWrite_mutex;
+    cond_t jobWrite_cond;
     size_t lastDictSize;
     inBuff_t input;
     cStat_t stats;
@@ -107,20 +120,38 @@ static void freeCompressionJobs(adaptCCtx* ctx)
     }
 }
 
+static int destroyMutex(mutex_t* mutex)
+{
+    if (mutex->noError) {
+        int const ret = pthread_mutex_destroy(&mutex->pMutex);
+        return ret;
+    }
+    return 0;
+}
+
+static int destroyCond(cond_t* cond)
+{
+    if (cond->noError) {
+        int const ret = pthread_cond_destroy(&cond->pCond);
+        return ret;
+    }
+    return 0;
+}
+
 static int freeCCtx(adaptCCtx* ctx)
 {
     if (!ctx) return 0;
     {
         int error = 0;
-        error |= pthread_mutex_destroy(&ctx->jobCompressed_mutex);
-        error |= pthread_cond_destroy(&ctx->jobCompressed_cond);
-        error |= pthread_mutex_destroy(&ctx->jobReady_mutex);
-        error |= pthread_cond_destroy(&ctx->jobReady_cond);
-        error |= pthread_mutex_destroy(&ctx->allJobsCompleted_mutex);
-        error |= pthread_cond_destroy(&ctx->allJobsCompleted_cond);
-        error |= pthread_mutex_destroy(&ctx->jobWrite_mutex);
-        error |= pthread_cond_destroy(&ctx->jobWrite_cond);
-        error |=  (ctx->dstFile != NULL && ctx->dstFile != stdout) ? fclose(ctx->dstFile) : 0;
+        error |= destroyMutex(&ctx->jobCompressed_mutex);
+        error |= destroyCond(&ctx->jobCompressed_cond);
+        error |= destroyMutex(&ctx->jobReady_mutex);
+        error |= destroyCond(&ctx->jobReady_cond);
+        error |= destroyMutex(&ctx->allJobsCompleted_mutex);
+        error |= destroyCond(&ctx->allJobsCompleted_cond);
+        error |= destroyMutex(&ctx->jobWrite_mutex);
+        error |= destroyCond(&ctx->jobWrite_cond);
+        error |= (ctx->dstFile != NULL && ctx->dstFile != stdout) ? fclose(ctx->dstFile) : 0;
         error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
         free(ctx->input.buffer.start);
         if (ctx->jobs){
@@ -132,23 +163,41 @@ static int freeCCtx(adaptCCtx* ctx)
     }
 }
 
+static int initMutex(mutex_t* mutex)
+{
+    int const ret = pthread_mutex_init(&mutex->pMutex, NULL);
+    mutex->noError = !ret;
+    return ret;
+}
+
+static int initCond(cond_t* cond)
+{
+    int const ret = pthread_cond_init(&cond->pCond, NULL);
+    cond->noError = !ret;
+    return ret;
+}
+
 static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
 {
 
-    adaptCCtx* ctx = calloc(1, sizeof(adaptCCtx));
+    adaptCCtx* const ctx = calloc(1, sizeof(adaptCCtx));
     if (ctx == NULL) {
         DISPLAY("Error: could not allocate space for context\n");
         return NULL;
     }
     ctx->compressionLevel = g_compressionLevel;
-    pthread_mutex_init(&ctx->jobCompressed_mutex, NULL);
-    pthread_cond_init(&ctx->jobCompressed_cond, NULL);
-    pthread_mutex_init(&ctx->jobReady_mutex, NULL);
-    pthread_cond_init(&ctx->jobReady_cond, NULL);
-    pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL);
-    pthread_cond_init(&ctx->allJobsCompleted_cond, NULL);
-    pthread_mutex_init(&ctx->jobWrite_mutex, NULL);
-    pthread_cond_init(&ctx->jobWrite_cond, NULL);
+    {
+        int pthreadError = 0;
+        pthreadError |= initMutex(&ctx->jobCompressed_mutex);
+        pthreadError |= initCond(&ctx->jobCompressed_cond);
+        pthreadError |= initMutex(&ctx->jobReady_mutex);
+        pthreadError |= initCond(&ctx->jobReady_cond);
+        pthreadError |= initMutex(&ctx->allJobsCompleted_mutex);
+        pthreadError |= initCond(&ctx->allJobsCompleted_cond);
+        pthreadError |= initMutex(&ctx->jobWrite_mutex);
+        pthreadError |= initCond(&ctx->jobWrite_cond);
+        if (pthreadError) return NULL;
+    }
     ctx->numJobs = numJobs;
     ctx->jobReadyID = 0;
     ctx->jobCompressedID = 0;
@@ -213,11 +262,11 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
 static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
 {
     if (!ctx) return;
-    pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
+    pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
     while (ctx->allJobsCompleted == 0) {
-        pthread_cond_wait(&ctx->allJobsCompleted_cond, &ctx->allJobsCompleted_mutex);
+        pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex);
     }
-    pthread_mutex_unlock(&ctx->allJobsCompleted_mutex);
+    pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
 }
 
 /*
@@ -252,14 +301,20 @@ static unsigned adaptCompressionLevel(adaptCCtx* ctx)
             reset = 1;
         }
         else if (compressSlow && ctx->compressionLevel > 1) {
+            double const completion = ctx->completion;
+            unsigned const maxChange = (unsigned)((1-completion) * MAX_COMPRESSION_LEVEL_CHANGE);
+            unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
             DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
-            ctx->compressionLevel--;
+            DEBUG(2, "completion: %f\n", completion);
+            ctx->compressionLevel -= change;
             reset = 1;
         }
         if (reset) {
             ctx->stats.readyCounter = 0;
             ctx->stats.writeCounter = 0;
             ctx->stats.compressedCounter = 0;
+            ctx->completion = 1;
+            ctx->completionMeasured = 0;
         }
         return ctx->compressionLevel;
     }
@@ -281,14 +336,14 @@ static void* compressionThread(void* arg)
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
         DEBUG(3, "compressionThread(): waiting on job ready\n");
-        pthread_mutex_lock(&ctx->jobReady_mutex);
+        pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
         while(currJob + 1 > ctx->jobReadyID) {
             ctx->stats.waitReady++;
             ctx->stats.readyCounter++;
             DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob);
-            pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex);
+            pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
         }
-        pthread_mutex_unlock(&ctx->jobReady_mutex);
+        pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
         DEBUG(3, "compressionThread(): continuing after job ready\n");
         DEBUG(3, "DICTIONARY ENDED\n");
         DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start);
@@ -299,7 +354,7 @@ static void* compressionThread(void* arg)
             DEBUG(3, "compression level used: %u\n", cLevel);
             /* begin compression */
             {
-                size_t useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
+                size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
                 DEBUG(2, "useDictSize: %zu, job->dictSize: %zu\n", useDictSize, job->dictSize);
                 size_t const dictModeError = ZSTD_setCCtxParameter(ctx->cctx, ZSTD_p_forceRawDict, 1);
                 size_t const initError = ZSTD_compressBegin_usingDict(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, cLevel);
@@ -332,11 +387,11 @@ static void* compressionThread(void* arg)
             }
             job->dst.size = job->compressedSize;
         }
-        pthread_mutex_lock(&ctx->jobCompressed_mutex);
+        pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
         ctx->jobCompressedID++;
         DEBUG(3, "signaling for job %u\n", currJob);
-        pthread_cond_signal(&ctx->jobCompressed_cond);
-        pthread_mutex_unlock(&ctx->jobCompressed_mutex);
+        pthread_cond_signal(&ctx->jobCompressed_cond.pCond);
+        pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
         DEBUG(3, "finished job compression %u\n", currJob);
         currJob++;
         if (job->lastJob || ctx->threadError) {
@@ -374,14 +429,19 @@ static void* outputThread(void* arg)
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
         DEBUG(3, "outputThread(): waiting on job compressed\n");
-        pthread_mutex_lock(&ctx->jobCompressed_mutex);
+        pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
         while (currJob + 1 > ctx->jobCompressedID) {
             ctx->stats.waitCompressed++;
             ctx->stats.compressedCounter++;
+            if (!ctx->completionMeasured) {
+                ctx->completion = ZSTD_getCompletion(ctx->cctx);
+                ctx->completionMeasured = 1;
+            }
+            DEBUG(2, "output detected completion: %f\n", ctx->completion);
             DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
-            pthread_cond_wait(&ctx->jobCompressed_cond, &ctx->jobCompressed_mutex);
+            pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
         }
-        pthread_mutex_unlock(&ctx->jobCompressed_mutex);
+        pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
         DEBUG(3, "outputThread(): continuing after job compressed\n");
         {
             size_t const compressedSize = job->compressedSize;
@@ -403,19 +463,19 @@ static void* outputThread(void* arg)
         currJob++;
         displayProgress(currJob, ctx->compressionLevel, job->lastJob);
         DEBUG(3, "locking job write mutex\n");
-        pthread_mutex_lock(&ctx->jobWrite_mutex);
+        pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
         ctx->jobWriteID++;
-        pthread_cond_signal(&ctx->jobWrite_cond);
-        pthread_mutex_unlock(&ctx->jobWrite_mutex);
+        pthread_cond_signal(&ctx->jobWrite_cond.pCond);
+        pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
         DEBUG(3, "unlocking job write mutex\n");
 
         if (job->lastJob || ctx->threadError) {
             /* finished with all jobs */
             DEBUG(3, "all jobs finished writing\n");
-            pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
+            pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
             ctx->allJobsCompleted = 1;
-            pthread_cond_signal(&ctx->allJobsCompleted_cond);
-            pthread_mutex_unlock(&ctx->allJobsCompleted_mutex);
+            pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
+            pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
             break;
         }
     }
@@ -428,15 +488,20 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     unsigned const nextJobIndex = nextJob % ctx->numJobs;
     jobDescription* job = &ctx->jobs[nextJobIndex];
     DEBUG(3, "createCompressionJob(): wait for job write\n");
-    pthread_mutex_lock(&ctx->jobWrite_mutex);
+    pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
     DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
     while (nextJob - ctx->jobWriteID >= ctx->numJobs) {
         ctx->stats.waitWrite++;
         ctx->stats.writeCounter++;
+        if (!ctx->completionMeasured) {
+            ctx->completion = ZSTD_getCompletion(ctx->cctx);
+            ctx->completionMeasured = 1;
+        }
+        DEBUG(2, "job creation detected completion %f\n", ctx->completion);
         DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
-        pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex);
+        pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
     }
-    pthread_mutex_unlock(&ctx->jobWrite_mutex);
+    pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
     DEBUG(3, "createCompressionJob(): continuing after job write\n");
 
     DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
@@ -446,10 +511,10 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     job->lastJob = last;
     memcpy(job->src.start, ctx->input.buffer.start, ctx->lastDictSize + srcSize);
     job->dictSize = ctx->lastDictSize;
-    pthread_mutex_lock(&ctx->jobReady_mutex);
+    pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
     ctx->jobReadyID++;
-    pthread_cond_signal(&ctx->jobReady_cond);
-    pthread_mutex_unlock(&ctx->jobReady_mutex);
+    pthread_cond_signal(&ctx->jobReady_cond.pCond);
+    pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
     DEBUG(3, "finished job creation %u\n", nextJob);
     ctx->nextJobID++;
     DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
index f492d92bdab360f17b841a5212864c5e57b97d2a..0c8edecec117535e669f324685e251a069b74019 100644 (file)
@@ -140,6 +140,9 @@ struct ZSTD_CCtx_s {
     /* Multi-threading */
     U32 nbThreads;
     ZSTDMT_CCtx* mtctx;
+
+    /* adaptive compression */
+    double completion;
 };
 
 
@@ -2845,6 +2848,7 @@ static size_t ZSTD_compress_frameChunk (ZSTD_CCtx* cctx,
     BYTE* op = ostart;
     U32 const maxDist = 1 << cctx->appliedParams.cParams.windowLog;
 
+    cctx->completion = 0;
     if (cctx->appliedParams.fParams.checksumFlag && srcSize)
         XXH64_update(&cctx->xxhState, src, srcSize);
 
@@ -2895,6 +2899,7 @@ static size_t ZSTD_compress_frameChunk (ZSTD_CCtx* cctx,
         }
 
         remaining -= blockSize;
+        cctx->completion = 1 - (double)remaining/srcSize;
         dstCapacity -= cSize;
         ip += blockSize;
         op += cSize;
@@ -2997,6 +3002,10 @@ static size_t ZSTD_compressContinue_internal (ZSTD_CCtx* cctx,
         return fhSize;
 }
 
+ZSTDLIB_API double ZSTD_getCompletion(ZSTD_CCtx* cctx)
+{
+    return cctx->completion;
+}
 
 size_t ZSTD_compressContinue (ZSTD_CCtx* cctx,
                               void* dst, size_t dstCapacity,
index 58e9a5606db8eb9bc92cdc381a93c520a6fde361..e835ad3a7db2808dec3337d17b7c19d83f2f897e 100644 (file)
@@ -808,7 +808,11 @@ ZSTDLIB_API size_t ZSTD_copyCCtx(ZSTD_CCtx* cctx, const ZSTD_CCtx* preparedCCtx,
 ZSTDLIB_API size_t ZSTD_compressContinue(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);
 ZSTDLIB_API size_t ZSTD_compressEnd(ZSTD_CCtx* cctx, void* dst, size_t dstCapacity, const void* src, size_t srcSize);
 
-
+/*! ZSTD_getCompletion: get a double representing how much of a file/buffer has been compressed
+ *                      using ZSTD_compressContinue()
+ * return: a double value in the range of 0 to 1 representing how much a compression job has finished
+ */
+ZSTDLIB_API double ZSTD_getCompletion(ZSTD_CCtx* cctx);
 
 /*-
   Buffer-less streaming decompression (synchronous mode)