]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
fixed the problem with pipeline tests by changing how jobs move through the threads
authorPaul Cruz <paulcruz74@fb.com>
Thu, 6 Jul 2017 00:24:21 +0000 (17:24 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Thu, 6 Jul 2017 00:24:21 +0000 (17:24 -0700)
contrib/adaptive-compression/multi.c

index 40045381f8da08a3bba12c4a2c18cb4b9b92b8d1..fd04a53b811a677d5e0a166dbb9ee1d5c76a5cdf 100644 (file)
@@ -26,15 +26,6 @@ typedef struct {
     buffer_t dst;
     unsigned compressionLevel;
     unsigned jobID;
-    unsigned jobCompleted;
-    unsigned jobReady;
-    unsigned jobWritten;
-    pthread_mutex_t* jobCompleted_mutex;
-    pthread_cond_t* jobCompleted_cond;
-    pthread_mutex_t* jobReady_mutex;
-    pthread_cond_t* jobReady_cond;
-    pthread_mutex_t* jobWrite_mutex;
-    pthread_cond_t* jobWrite_cond;
     size_t compressedSize;
 } jobDescription;
 
@@ -45,6 +36,9 @@ typedef struct {
     unsigned lastJobID;
     unsigned nextJobID;
     unsigned threadError;
+    unsigned jobReadyID;
+    unsigned jobCompletedID;
+    unsigned jobWrittenID;
     unsigned allJobsCompleted;
     pthread_mutex_t jobCompleted_mutex;
     pthread_cond_t jobCompleted_cond;
@@ -107,20 +101,11 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
     pthread_mutex_init(&ctx->jobWrite_mutex, NULL);
     pthread_cond_init(&ctx->jobWrite_cond, NULL);
     ctx->numJobs = numJobs;
+    ctx->jobReadyID = 0;
+    ctx->jobCompletedID = 0;
+    ctx->jobWrittenID = 0;
     ctx->lastJobID = -1; /* intentional underflow */
     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
-    {
-        unsigned u;
-        for (u=0; u<numJobs; u++) {
-            ctx->jobs[u].jobCompleted_mutex = &ctx->jobCompleted_mutex;
-            ctx->jobs[u].jobCompleted_cond = &ctx->jobCompleted_cond;
-            ctx->jobs[u].jobReady_mutex = &ctx->jobReady_mutex;
-            ctx->jobs[u].jobReady_cond = &ctx->jobReady_cond;
-            ctx->jobs[u].jobWrite_mutex = &ctx->jobWrite_mutex;
-            ctx->jobs[u].jobWrite_cond = &ctx->jobWrite_cond;
-            ctx->jobs[u].jobWritten = 1;
-        }
-    }
     ctx->nextJobID = 0;
     ctx->threadError = 0;
     ctx->allJobsCompleted = 0;
@@ -161,11 +146,11 @@ static void* compressionThread(void* arg)
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
         // DEBUGLOG(2, "compressionThread(): waiting on job ready\n");
-        pthread_mutex_lock(job->jobReady_mutex);
-        while(job->jobReady == 0) {
-            pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex);
+        pthread_mutex_lock(&ctx->jobReady_mutex);
+        while(currJob + 1 > ctx->jobReadyID) {
+            pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex);
         }
-        pthread_mutex_unlock(job->jobReady_mutex);
+        pthread_mutex_unlock(&ctx->jobReady_mutex);
         // DEBUGLOG(2, "compressionThread(): continuing after job ready\n");
         /* compress the data */
         {
@@ -177,11 +162,11 @@ static void* compressionThread(void* arg)
             }
             job->compressedSize = compressedSize;
         }
-        pthread_mutex_lock(job->jobCompleted_mutex);
-        job->jobCompleted = 1;
+        pthread_mutex_lock(&ctx->jobCompleted_mutex);
+        ctx->jobCompletedID++;
         DEBUGLOG(2, "signaling for job %u\n", currJob);
-        pthread_cond_signal(job->jobCompleted_cond);
-        pthread_mutex_unlock(job->jobCompleted_mutex);
+        pthread_cond_signal(&ctx->jobCompleted_cond);
+        pthread_mutex_unlock(&ctx->jobCompleted_mutex);
         currJob++;
         if (currJob >= ctx->lastJobID || ctx->threadError) {
             /* finished compressing all jobs */
@@ -201,12 +186,12 @@ static void* outputThread(void* arg)
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
         DEBUGLOG(2, "outputThread(): waiting on job completed\n");
-        pthread_mutex_lock(job->jobCompleted_mutex);
-        while (job->jobCompleted == 0) {
+        pthread_mutex_lock(&ctx->jobCompleted_mutex);
+        while (currJob + 1 > ctx->jobCompletedID) {
             DEBUGLOG(2, "inside job completed wait loop waiting on %u\n", currJob);
-            pthread_cond_wait(job->jobCompleted_cond, job->jobCompleted_mutex);
+            pthread_cond_wait(&ctx->jobCompleted_cond, &ctx->jobCompleted_mutex);
         }
-        pthread_mutex_unlock(job->jobCompleted_mutex);
+        pthread_mutex_unlock(&ctx->jobCompleted_mutex);
         DEBUGLOG(2, "outputThread(): continuing after job completed\n");
         {
             size_t const compressedSize = job->compressedSize;
@@ -224,10 +209,10 @@ static void* outputThread(void* arg)
         }
         currJob++;
         DEBUGLOG(2, "locking job write mutex\n");
-        pthread_mutex_lock(job->jobWrite_mutex);
-        job->jobWritten = 1;
-        pthread_cond_signal(job->jobWrite_cond);
-        pthread_mutex_unlock(job->jobWrite_mutex);
+        pthread_mutex_lock(&ctx->jobWrite_mutex);
+        ctx->jobWrittenID++;
+        pthread_cond_signal(&ctx->jobWrite_cond);
+        pthread_mutex_unlock(&ctx->jobWrite_mutex);
         DEBUGLOG(2, "unlocking job write mutex\n");
 
         DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID);
@@ -250,23 +235,17 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
     unsigned const nextJobIndex = nextJob % ctx->numJobs;
     jobDescription* job = &ctx->jobs[nextJobIndex];
     // DEBUGLOG(2, "createCompressionJob(): wait for job write\n");
-    pthread_mutex_lock(job->jobWrite_mutex);
-    while (job->jobWritten == 0) {
-        pthread_cond_wait(job->jobWrite_cond, job->jobWrite_mutex);
+    pthread_mutex_lock(&ctx->jobWrite_mutex);
+    while (nextJob - ctx->jobWrittenID >= ctx->numJobs) {
+        pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex);
     }
-    pthread_mutex_unlock(job->jobWrite_mutex);
+    pthread_mutex_unlock(&ctx->jobWrite_mutex);
     // DEBUGLOG(2, "createCompressionJob(): continuing after job write\n");
     job->compressionLevel = ctx->compressionLevel;
     job->src.start = malloc(srcSize);
     job->src.size = srcSize;
     job->dst.size = ZSTD_compressBound(srcSize);
     job->dst.start = malloc(job->dst.size);
-    job->jobCompleted = 0;
-    job->jobWritten = 0;
-    job->jobCompleted_cond = &ctx->jobCompleted_cond;
-    job->jobCompleted_mutex = &ctx->jobCompleted_mutex;
-    job->jobReady_cond = &ctx->jobReady_cond;
-    job->jobReady_mutex = &ctx->jobReady_mutex;
     job->jobID = nextJob;
     if (!job->src.start || !job->dst.start) {
         /* problem occurred, free things then return */
@@ -276,10 +255,10 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
         return 1;
     }
     memcpy(job->src.start, data, srcSize);
-    pthread_mutex_lock(job->jobReady_mutex);
-    job->jobReady = 1;
-    pthread_cond_signal(job->jobReady_cond);
-    pthread_mutex_unlock(job->jobReady_mutex);
+    pthread_mutex_lock(&ctx->jobReady_mutex);
+    ctx->jobReadyID++;
+    pthread_cond_signal(&ctx->jobReady_cond);
+    pthread_mutex_unlock(&ctx->jobReady_mutex);
     ctx->nextJobID++;
     return 0;
 }