]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added code for waiitng for all jobs to finish
authorPaul Cruz <paulcruz74@fb.com>
Tue, 4 Jul 2017 02:24:22 +0000 (19:24 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Tue, 4 Jul 2017 02:24:22 +0000 (19:24 -0700)
contrib/adaptive-compression/v2
contrib/adaptive-compression/v2.c

index 0c0e194ebae05845c80f8f24d2534a53a56c414d..38034cdab55dde6c55382233c34d86b0b58d2083 100755 (executable)
Binary files a/contrib/adaptive-compression/v2 and b/contrib/adaptive-compression/v2 differ
index 5bbb8b9bf6ba7df193dae0946d0a8114cd238a3c..eaaecb76c9d7c2cc0f427c03fa85e9fcd3a22347 100644 (file)
@@ -35,10 +35,13 @@ typedef struct {
     unsigned numJobs;
     unsigned nextJobID;
     unsigned threadError;
+    unsigned allJobsCompleted;
     pthread_mutex_t jobCompleted_mutex;
     pthread_cond_t jobCompleted_cond;
     pthread_mutex_t jobReady_mutex;
     pthread_cond_t jobReady_cond;
+    pthread_mutex_t allJobsCompleted_mutex;
+    pthread_cond_t allJobsCompleted_cond;
     jobDescription* jobs;
     FILE* dstFile;
 } adaptCCtx;
@@ -57,10 +60,13 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
     pthread_cond_init(&ctx->jobCompleted_cond, NULL);
     pthread_mutex_init(&ctx->jobReady_mutex, NULL);
     pthread_cond_init(&ctx->jobReady_cond, NULL);
+    pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL);
+    pthread_cond_init(&ctx->allJobsCompleted_cond, NULL);
     ctx->numJobs = numJobs;
     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
     ctx->nextJobID = 0;
     ctx->threadError = 0;
+    ctx->allJobsCompleted = 0;
     if (!ctx->jobs) {
         DISPLAY("Error: could not allocate space for jobs during context creation\n");
         return NULL;
@@ -90,15 +96,21 @@ static void freeCompressionJobs(adaptCCtx* ctx)
 
 static int freeCCtx(adaptCCtx* ctx)
 {
-    /* TODO: wait until jobs finish */
-    int const completedMutexError = pthread_mutex_destroy(&ctx->jobCompleted_mutex);
-    int const completedCondError = pthread_cond_destroy(&ctx->jobCompleted_cond);
-    int const readyMutexError = pthread_mutex_destroy(&ctx->jobReady_mutex);
-    int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond);
-    int const fileError =  fclose(ctx->dstFile);
-    freeCompressionJobs(ctx);
-    free(ctx->jobs);
-    return completedMutexError | completedCondError | readyMutexError | readyCondError | fileError;
+    pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
+    while (ctx->allJobsCompleted == 0) {
+        pthread_cond_wait(&ctx->allJobsCompleted_cond, &ctx->allJobsCompleted_mutex);
+    }
+    pthread_mutex_unlock(&ctx->allJobsCompleted_mutex);
+    {
+        int const completedMutexError = pthread_mutex_destroy(&ctx->jobCompleted_mutex);
+        int const completedCondError = pthread_cond_destroy(&ctx->jobCompleted_cond);
+        int const readyMutexError = pthread_mutex_destroy(&ctx->jobReady_mutex);
+        int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond);
+        int const fileError =  fclose(ctx->dstFile);
+        freeCompressionJobs(ctx);
+        free(ctx->jobs);
+        return completedMutexError | completedCondError | readyMutexError | readyCondError | fileError;
+    }
 }
 
 static void* compressionThread(void* arg)
@@ -164,6 +176,10 @@ static void* outputThread(void* arg)
         currJob++;
         if (currJob >= ctx->numJobs || ctx->threadError) {
             /* finished with all jobs */
+            pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
+            ctx->allJobsCompleted = 1;
+            pthread_cond_signal(&ctx->allJobsCompleted_cond);
+            pthread_mutex_unlock(&ctx->allJobsCompleted_mutex);
             break;
         }
     }
@@ -267,6 +283,7 @@ int main(int argCount, const char* argv[])
             goto cleanup;
         }
     }
+
     /* create compression thread */
     {
         pthread_t compression;
@@ -297,7 +314,7 @@ int main(int argCount, const char* argv[])
         }
         if (feof(srcFile)) break;
     }
-    DISPLAY("cleanup\n");
+
 cleanup:
     /* file compression completed */
     ret  |= (srcFile != NULL) ? fclose(srcFile) : 0;