]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
changed how the detection of the last job works
authorPaul Cruz <paulcruz74@fb.com>
Mon, 10 Jul 2017 23:27:58 +0000 (16:27 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Mon, 10 Jul 2017 23:27:58 +0000 (16:27 -0700)
contrib/adaptive-compression/adapt.c

index e1ac0aaf389746a3c5d779e2ffabc36a2492bd5c..8f3acd5c915da106b510ec6d11b256cf5d53c2ac 100644 (file)
@@ -50,6 +50,7 @@ typedef struct {
     buffer_t dst;
     unsigned compressionLevel;
     unsigned jobID;
+    unsigned lastJob;
     size_t compressedSize;
 } jobDescription;
 
@@ -57,7 +58,6 @@ typedef struct {
     unsigned compressionLevel;
     unsigned numActiveThreads;
     unsigned numJobs;
-    unsigned lastJobID;
     unsigned nextJobID;
     unsigned threadError;
     unsigned jobReadyID;
@@ -134,15 +134,15 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
     ctx->jobReadyID = 0;
     ctx->jobCompressedID = 0;
     ctx->jobWriteID = 0;
-    ctx->lastJobID = -1; /* intentional underflow */
     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
-    /* allocating buffers for jobs */
+    /* initializing jobs */
     {
         unsigned jobNum;
         for (jobNum=0; jobNum<numJobs; jobNum++) {
             jobDescription* job = &ctx->jobs[jobNum];
             job->src.start = malloc(FILE_CHUNK_SIZE);
             job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
+            job->lastJob = 0;
             if (!job->src.start || !job->dst.start) {
                 DISPLAY("Could not allocate buffers for jobs\n");
                 freeCCtx(ctx);
@@ -265,7 +265,7 @@ static void* compressionThread(void* arg)
         pthread_mutex_unlock(&ctx->jobCompressed_mutex);
         DEBUGLOG(2, "finished job compression %u\n", currJob);
         currJob++;
-        if (currJob >= ctx->lastJobID || ctx->threadError) {
+        if (job->lastJob || ctx->threadError) {
             /* finished compressing all jobs */
             DEBUGLOG(2, "all jobs finished compressing\n");
             break;
@@ -327,7 +327,7 @@ static void* outputThread(void* arg)
         }
         DEBUGLOG(2, "finished job write %u\n", currJob);
         currJob++;
-        displayProgress(currJob, ctx->compressionLevel, currJob >= ctx->lastJobID);
+        displayProgress(currJob, ctx->compressionLevel, job->lastJob);
         DEBUGLOG(2, "locking job write mutex\n");
         pthread_mutex_lock(&ctx->jobWrite_mutex);
         ctx->jobWriteID++;
@@ -335,8 +335,7 @@ static void* outputThread(void* arg)
         pthread_mutex_unlock(&ctx->jobWrite_mutex);
         DEBUGLOG(2, "unlocking job write mutex\n");
 
-        DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID);
-        if (currJob >= ctx->lastJobID || ctx->threadError) {
+        if (job->lastJob || ctx->threadError) {
             /* finished with all jobs */
             DEBUGLOG(2, "all jobs finished writing\n");
             pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
@@ -349,7 +348,7 @@ static void* outputThread(void* arg)
     return arg;
 }
 
-static int createCompressionJob(adaptCCtx* ctx, size_t srcSize)
+static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
 {
     unsigned const nextJob = ctx->nextJobID;
     unsigned const nextJobIndex = nextJob % ctx->numJobs;
@@ -371,6 +370,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize)
     job->src.size = srcSize;
     job->dst.size = ZSTD_compressBound(srcSize);
     job->jobID = nextJob;
+    job->lastJob = last;
     memcpy(job->src.start, ctx->input.buffer.start, srcSize);
     pthread_mutex_lock(&ctx->jobReady_mutex);
     ctx->jobReadyID++;
@@ -457,7 +457,8 @@ static int compressFilename(const char* const srcFilename, const char* const dst
         g_streamedSize += readSize;
         /* reading was fine, now create the compression job */
         {
-            int const error = createCompressionJob(ctx, readSize);
+            int const last = feof(srcFile);
+            int const error = createCompressionJob(ctx, readSize, last);
             if (error != 0) {
                 ret = error;
                 ctx->threadError = 1;
@@ -466,7 +467,6 @@ static int compressFilename(const char* const srcFilename, const char* const dst
         }
         if (feof(srcFile)) {
             DEBUGLOG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
-            ctx->lastJobID = ctx->nextJobID;
             break;
         }
     }