]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
rename completion variable, split up fwrite operations in order to track progress
authorPaul Cruz <paulcruz74@fb.com>
Tue, 18 Jul 2017 20:30:29 +0000 (13:30 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Tue, 18 Jul 2017 20:30:29 +0000 (13:30 -0700)
contrib/adaptive-compression/adapt.c

index 62f5ec912f92f6246a8c1633a70cebfc8316ff90..404db6d94923b9d21964e8bb0f94945c3cbbcb74 100644 (file)
@@ -87,8 +87,8 @@ typedef struct {
     unsigned jobWriteID;
     unsigned allJobsCompleted;
     unsigned adaptParam;
-    unsigned completionMeasured;
-    double completion;
+    unsigned compressionCompletionMeasured;
+    double compressionCompletion;
     mutex_t jobCompressed_mutex;
     cond_t jobCompressed_cond;
     mutex_t jobReady_mutex;
@@ -319,7 +319,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
             reset = 1;
         }
         else if (compressSlow && ctx->compressionLevel > 1) {
-            double const completion = ctx->completion;
+            double const completion = ctx->compressionCompletion;
             unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
             unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
             DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
@@ -331,8 +331,8 @@ static void adaptCompressionLevel(adaptCCtx* ctx)
             ctx->stats.readyCounter = 0;
             ctx->stats.writeCounter = 0;
             ctx->stats.compressedCounter = 0;
-            ctx->completion = 1;
-            ctx->completionMeasured = 0;
+            ctx->compressionCompletion = 1;
+            ctx->compressionCompletionMeasured = 0;
         }
     }
 }
@@ -455,12 +455,12 @@ static void* outputThread(void* arg)
             ctx->stats.waitCompressed++;
             ctx->stats.compressedCounter++;
             reduceCounters(ctx);
-            if (!ctx->completionMeasured) {
-                ctx->completion = ZSTD_getCompletion(ctx->cctx);
-                ctx->completionMeasured = 1;
+            if (!ctx->compressionCompletionMeasured) {
+                ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
+                ctx->compressionCompletionMeasured = 1;
             }
             adaptCompressionLevel(ctx);
-            DEBUG(3, "output detected completion: %f\n", ctx->completion);
+            DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion);
             DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
         }
@@ -468,14 +468,25 @@ static void* outputThread(void* arg)
         DEBUG(3, "outputThread(): continuing after job compressed\n");
         {
             size_t const compressedSize = job->compressedSize;
+            size_t remaining = compressedSize;
             if (ZSTD_isError(compressedSize)) {
                 DISPLAY("Error: an error occurred during compression\n");
                 signalErrorToThreads(ctx);
                 return arg;
             }
             {
-                size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
-                if (writeSize != compressedSize) {
+                // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
+                size_t const blockSize = 4 << 20;
+                size_t pos = 0;
+                for ( ; ; ) {
+                    size_t const writeSize = MIN(remaining, blockSize);
+                    size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile);
+                    if (ret != writeSize) break;
+                    pos += ret;
+                    remaining -= ret;
+                    if (remaining == 0) break;
+                }
+                if (pos != compressedSize) {
                     DISPLAY("Error: an error occurred during file write operation\n");
                     signalErrorToThreads(ctx);
                     return arg;
@@ -517,12 +528,12 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
         ctx->stats.waitWrite++;
         ctx->stats.writeCounter++;
         reduceCounters(ctx);
-        if (!ctx->completionMeasured) {
-            ctx->completion = ZSTD_getCompletion(ctx->cctx);
-            ctx->completionMeasured = 1;
+        if (!ctx->compressionCompletion) {
+            ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
+            ctx->compressionCompletionMeasured = 1;
         }
         adaptCompressionLevel(ctx);
-        DEBUG(3, "job creation detected completion %f\n", ctx->completion);
+        DEBUG(3, "job creation detected completion %f\n", ctx->compressionCompletion);
         DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
         pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
     }