]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
switched over to model where reading only waits on compression thread
authorPaul Cruz <paulcruz74@fb.com>
Sat, 22 Jul 2017 00:49:39 +0000 (17:49 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Sat, 22 Jul 2017 00:49:39 +0000 (17:49 -0700)
contrib/adaptive-compression/adapt.c

index e07333fc3ccf2cf50100771b350a84e9dab52c54..9057bfcfc715dbef261ce0aaf6aedc320364618d 100644 (file)
@@ -51,7 +51,7 @@ typedef struct {
     buffer_t dst;
     unsigned compressionLevel;
     unsigned jobID;
-    unsigned lastJob;
+    unsigned lastJobPlusOne;
     size_t compressedSize;
     size_t dictSize;
 } jobDescription;
@@ -226,7 +226,7 @@ static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
             jobDescription* job = &ctx->jobs[jobNum];
             job->src.start = malloc(2 * FILE_CHUNK_SIZE);
             job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
-            job->lastJob = 0;
+            job->lastJobPlusOne = 0;
             if (!job->src.start || !job->dst.start) {
                 DISPLAY("Could not allocate buffers for jobs\n");
                 return 1;
@@ -400,22 +400,30 @@ static void* compressionThread(void* arg)
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
         DEBUG(3, "compressionThread(): waiting on job ready\n");
-
-
+        /* wait until job is ready */
         pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
-        while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
+        while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-            /* compression thread is waiting, take measurements of write completion and read completion */
+            /* compression thread is waiting on creation thread, take measurement */
             ctx->compressWaitCreateCompletion = ctx->createCompletion;
-            ctx->compressWaitWriteCompletion = ctx->writeCompletion;
-            DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f : compressWaitWriteCompletion %f\n", ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion);
+            DEBUG(3, "compression thread waiting : compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
             DEBUG(3, "create completion: %f\n", ctx->createCompletion);
-            DEBUG(2, "compression thread waiting for nextJob: %u, compressWaitCreateCompletion %f, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitCreateCompletion, ctx->compressWaitWriteCompletion);
+            DEBUG(2, "compression thread waiting for ready: %u, compressWaitCreateCompletion %f\n", currJob, ctx->compressWaitCreateCompletion);
             pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
             pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
         }
         pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
 
+        /* wait until job previously in this space is written */
+        pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
+        while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
+            pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+            ctx->compressWaitWriteCompletion = ctx->writeCompletion;
+            DEBUG(2, "compression thread waiting for write: %u, compressWaitWriteCompletion %f\n", currJob, ctx->compressWaitWriteCompletion);
+            pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+            pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
+        }
+        pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
         /* reset compression completion */
         pthread_mutex_lock(&ctx->completion_mutex.pMutex);
         ctx->compressionCompletion = 0;
@@ -439,10 +447,8 @@ static void* compressionThread(void* arg)
             size_t dstPos = 0;
             DEBUG(3, "cLevel used: %u\n", cLevel);
             DEBUG(3, "compression level used: %u\n", cLevel);
-
             /* reset compressed size */
             job->compressedSize = 0;
-
             /* begin compression */
             {
                 size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
@@ -474,10 +480,10 @@ static void* compressionThread(void* arg)
                     ZSTD_invalidateRepCodes(ctx->cctx);
                 }
                 {
-                    DEBUG(3, "write out ending: %d\n", job->lastJob && (remaining == actualBlockSize));
-                    DEBUG(3, "lastJob %u\n", job->lastJob);
+                    DEBUG(3, "write out ending: %d\n", (job->lastJobPlusOne == currJob + 1) && (remaining == actualBlockSize));
+                    DEBUG(3, "lastJobPlusOne %u\n", job->lastJobPlusOne);
                     DEBUG(3, "compressionBlockSize %zu\n", compressionBlockSize);
-                    size_t const ret = (job->lastJob && remaining == actualBlockSize) ?
+                    size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ?
                                             ZSTD_compressEnd     (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
                                             ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
                     if (ZSTD_isError(ret)) {
@@ -503,15 +509,16 @@ static void* compressionThread(void* arg)
         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
         ctx->jobCompressedID++;
         DEBUG(3, "signaling for job %u\n", currJob);
-        pthread_cond_signal(&ctx->jobCompressed_cond.pCond);
+        pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
         pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
         DEBUG(3, "finished job compression %u\n", currJob);
-        currJob++;
-        if (job->lastJob || ctx->threadError) {
+        if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
             /* finished compressing all jobs */
             DEBUG(3, "all jobs finished compressing\n");
             break;
         }
+
+        currJob++;
     }
     return arg;
 }
@@ -544,7 +551,6 @@ static void* outputThread(void* arg)
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
         DEBUG(3, "outputThread(): waiting on job compressed\n");
-
         pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
         while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
             pthread_mutex_lock(&ctx->completion_mutex.pMutex);
@@ -599,8 +605,7 @@ static void* outputThread(void* arg)
             }
         }
         DEBUG(3, "finished job write %u\n", currJob);
-        currJob++;
-        displayProgress(currJob, ctx->compressionLevel, job->lastJob);
+        displayProgress(currJob, ctx->compressionLevel, job->lastJobPlusOne == currJob + 1);
         DEBUG(3, "locking job write mutex\n");
         pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
         ctx->jobWriteID++;
@@ -608,7 +613,7 @@ static void* outputThread(void* arg)
         pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
         DEBUG(3, "unlocking job write mutex\n");
 
-        if (job->lastJob || ctx->threadError) {
+        if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
             /* finished with all jobs */
             DEBUG(3, "all jobs finished writing\n");
             pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
@@ -617,6 +622,7 @@ static void* outputThread(void* arg)
             pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
             break;
         }
+        currJob++;
 
     }
     return arg;
@@ -628,21 +634,22 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     unsigned const nextJobIndex = nextJob % ctx->numJobs;
     jobDescription* job = &ctx->jobs[nextJobIndex];
     DEBUG(3, "createCompressionJob(): wait for job write\n");
-    pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
-    DEBUG(3, "Creating new compression job -- nextJob: %u, jobCompressedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompressedID, ctx->jobWriteID, ctx->numJobs);
-    while (nextJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
+
+
+    /* wait until the job has been compressed */
+    pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
+    while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
         pthread_mutex_lock(&ctx->completion_mutex.pMutex);
-        /* creation thread is waiting, take measurement of compression completion */
+        /* creation thread is waiting, take measurement of completion */
         ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
         ctx->createWaitWriteCompletion = ctx->writeCompletion;
         DEBUG(3, "creation thread waiting : createWaitCompressionCompletion %f : createWaitWriteCompletion %f\n", ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
         DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion);
         DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f, createWaitWriteCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion, ctx->createWaitWriteCompletion);
         pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
-        pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
+        pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
     }
-    pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
-
+    pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
     /* reset create completion */
     pthread_mutex_lock(&ctx->completion_mutex.pMutex);
     ctx->createCompletion = 0;
@@ -653,7 +660,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
     job->compressionLevel = ctx->compressionLevel;
     job->src.size = srcSize;
     job->jobID = nextJob;
-    job->lastJob = last;
+    if (last) job->lastJobPlusOne = nextJob + 1;
     {
         /* swap buffer */
         void* const copy = job->src.start;