]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
removed calculation of file size and replaced with limited number of available jobs
authorPaul Cruz <paulcruz74@fb.com>
Wed, 5 Jul 2017 18:52:55 +0000 (11:52 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Wed, 5 Jul 2017 18:52:55 +0000 (11:52 -0700)
contrib/adaptive-compression/multi.c
contrib/adaptive-compression/run.sh

index b89d0c239d3f87e29b00a93621e1f053994071b6..a60f48ece3a49b7ff53e6107bdfe6ca551388cc8 100644 (file)
@@ -1,5 +1,6 @@
 #define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
 #define FILE_CHUNK_SIZE 4 << 20
+#define MAX_NUM_JOBS 30;
 typedef unsigned char BYTE;
 
 #include <stdio.h>      /* fprintf */
@@ -22,10 +23,13 @@ typedef struct {
     unsigned jobID;
     unsigned jobCompleted;
     unsigned jobReady;
+    unsigned jobWritten;
     pthread_mutex_t* jobCompleted_mutex;
     pthread_cond_t* jobCompleted_cond;
     pthread_mutex_t* jobReady_mutex;
     pthread_cond_t* jobReady_cond;
+    pthread_mutex_t* jobWrite_mutex;
+    pthread_cond_t* jobWrite_cond;
     size_t compressedSize;
 } jobDescription;
 
@@ -43,6 +47,8 @@ typedef struct {
     pthread_cond_t jobReady_cond;
     pthread_mutex_t allJobsCompleted_mutex;
     pthread_cond_t allJobsCompleted_cond;
+    pthread_mutex_t jobWrite_mutex;
+    pthread_cond_t jobWrite_cond;
     jobDescription* jobs;
     FILE* dstFile;
 } adaptCCtx;
@@ -66,12 +72,14 @@ static int freeCCtx(adaptCCtx* ctx)
         int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond);
         int const allJobsMutexError = pthread_mutex_destroy(&ctx->allJobsCompleted_mutex);
         int const allJobsCondError = pthread_cond_destroy(&ctx->allJobsCompleted_cond);
+        int const jobWriteMutexError = pthread_mutex_destroy(&ctx->jobWrite_mutex);
+        int const jobWriteCondError = pthread_cond_destroy(&ctx->jobWrite_cond);
         int const fileCloseError =  ctx->dstFile != NULL ? fclose(ctx->dstFile) : 0;
         if (ctx->jobs){
             freeCompressionJobs(ctx);
             free(ctx->jobs);
         }
-        return completedMutexError | completedCondError | readyMutexError | readyCondError | fileCloseError | allJobsMutexError | allJobsCondError;
+        return completedMutexError | completedCondError | readyMutexError | readyCondError | fileCloseError | allJobsMutexError | allJobsCondError | jobWriteMutexError | jobWriteCondError;
     }
 }
 
@@ -91,6 +99,8 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
     pthread_cond_init(&ctx->jobReady_cond, NULL);
     pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL);
     pthread_cond_init(&ctx->allJobsCompleted_cond, NULL);
+    pthread_mutex_init(&ctx->jobWrite_mutex, NULL);
+    pthread_cond_init(&ctx->jobWrite_cond, NULL);
     ctx->numJobs = numJobs;
     ctx->lastJobID = -1; /* intentional underflow */
     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
@@ -101,6 +111,9 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
             ctx->jobs[u].jobCompleted_cond = &ctx->jobCompleted_cond;
             ctx->jobs[u].jobReady_mutex = &ctx->jobReady_mutex;
             ctx->jobs[u].jobReady_cond = &ctx->jobReady_cond;
+            ctx->jobs[u].jobWrite_mutex = &ctx->jobWrite_mutex;
+            ctx->jobs[u].jobWrite_cond = &ctx->jobWrite_cond;
+            ctx->jobs[u].jobWritten = 1;
         }
     }
     ctx->nextJobID = 0;
@@ -139,7 +152,8 @@ static void* compressionThread(void* arg)
     adaptCCtx* ctx = (adaptCCtx*)arg;
     unsigned currJob = 0;
     for ( ; ; ) {
-        jobDescription* job = &ctx->jobs[currJob];
+        unsigned const currJobIndex = currJob % ctx->numJobs;
+        jobDescription* job = &ctx->jobs[currJobIndex];
         pthread_mutex_lock(job->jobReady_mutex);
         while(job->jobReady == 0) {
             pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex);
@@ -174,7 +188,8 @@ static void* outputThread(void* arg)
 
     unsigned currJob = 0;
     for ( ; ; ) {
-        jobDescription* job = &ctx->jobs[currJob];
+        unsigned const currJobIndex = currJob % ctx->numJobs;
+        jobDescription* job = &ctx->jobs[currJobIndex];
         pthread_mutex_lock(job->jobCompleted_mutex);
         while (job->jobCompleted == 0) {
             pthread_cond_wait(job->jobCompleted_cond, job->jobCompleted_mutex);
@@ -187,7 +202,7 @@ static void* outputThread(void* arg)
                 return arg;
             }
             {
-                size_t const writeSize = fwrite(ctx->jobs[currJob].dst.start, 1, compressedSize, ctx->dstFile);
+                size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, ctx->dstFile);
                 if (writeSize != compressedSize) {
                     DISPLAY("Error: an error occurred during file write operation\n");
                     return arg;
@@ -195,6 +210,10 @@ static void* outputThread(void* arg)
             }
         }
         currJob++;
+        pthread_mutex_lock(job->jobWrite_mutex);
+        job->jobWritten = 1;
+        pthread_cond_signal(job->jobWrite_cond);
+        pthread_mutex_unlock(job->jobWrite_mutex);
         if (currJob >= ctx->lastJobID || ctx->threadError) {
             /* finished with all jobs */
             pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
@@ -232,13 +251,20 @@ static size_t getFileSize(const char* const filename)
 static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
 {
     unsigned const nextJob = ctx->nextJobID;
-    jobDescription* job = &ctx->jobs[nextJob];
+    unsigned const nextJobIndex = nextJob % ctx->numJobs;
+    jobDescription* job = &ctx->jobs[nextJobIndex];
+    pthread_mutex_lock(job->jobWrite_mutex);
+    while (job->jobWritten == 0) {
+        pthread_cond_wait(job->jobWrite_cond, job->jobWrite_mutex);
+    }
+    pthread_mutex_unlock(job->jobWrite_mutex);
     job->compressionLevel = ctx->compressionLevel;
     job->src.start = malloc(srcSize);
     job->src.size = srcSize;
     job->dst.size = ZSTD_compressBound(srcSize);
     job->dst.start = malloc(job->dst.size);
     job->jobCompleted = 0;
+    job->jobWritten = 0;
     job->jobCompleted_cond = &ctx->jobCompleted_cond;
     job->jobCompleted_mutex = &ctx->jobCompleted_mutex;
     job->jobReady_cond = &ctx->jobReady_cond;
@@ -265,8 +291,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst
     BYTE* const src = malloc(FILE_CHUNK_SIZE);
     FILE* const srcFile = fopen(srcFilename, "rb");
     size_t fileSize = getFileSize(srcFilename);
-    size_t const numJobsPrelim = (fileSize / ((size_t)FILE_CHUNK_SIZE));
-    size_t const numJobs = (numJobsPrelim * FILE_CHUNK_SIZE) == fileSize ? numJobsPrelim : numJobsPrelim + 1;
+    size_t const numJobs = MAX_NUM_JOBS;
     int ret = 0;
     adaptCCtx* ctx = NULL;
 
index b9c4caf7f341bb60b0d7d645624b3d88afd9bb93..9d2e987061063e572fda918b657d57b517391e09 100755 (executable)
@@ -1,6 +1,39 @@
 make clean multi
+
 ./multi tests/test2048.pdf tmp.zst
 zstd -d tmp.zst
 diff tmp tests/test2048.pdf
 echo "diff test complete"
+rm tmp*
+
+./multi tests/test512.pdf tmp.zst
+zstd -d tmp.zst
+diff tmp tests/test512.pdf
+echo "diff test complete"
+rm tmp*
+
+./multi tests/test64.pdf tmp.zst
+zstd -d tmp.zst
+diff tmp tests/test64.pdf
+echo "diff test complete"
+rm tmp*
+
+./multi tests/test16.pdf tmp.zst
+zstd -d tmp.zst
+diff tmp tests/test16.pdf
+echo "diff test complete"
+rm tmp*
+
+./multi tests/test4.pdf tmp.zst
+zstd -d tmp.zst
+diff tmp tests/test4.pdf
+echo "diff test complete"
+rm tmp*
+
+./multi tests/test.pdf tmp.zst
+zstd -d tmp.zst
+diff tmp tests/test.pdf
+echo "diff test complete"
+rm tmp*
+
 make clean