]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
working I believe
authorPaul Cruz <paulcruz74@fb.com>
Tue, 4 Jul 2017 03:05:42 +0000 (20:05 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Tue, 4 Jul 2017 03:05:42 +0000 (20:05 -0700)
contrib/adaptive-compression/v2.c

index edb2d0f3f39c8b7971f5e273085fdd4ea5dc4ac1..2f58e75034b0c8638aa0302758a5951ec334c9f2 100644 (file)
@@ -33,6 +33,7 @@ typedef struct {
     unsigned compressionLevel;
     unsigned numActiveThreads;
     unsigned numJobs;
+    unsigned lastJobID;
     unsigned nextJobID;
     unsigned threadError;
     unsigned allJobsCompleted;
@@ -63,7 +64,9 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
     pthread_mutex_init(&ctx->allJobsCompleted_mutex, NULL);
     pthread_cond_init(&ctx->allJobsCompleted_cond, NULL);
     ctx->numJobs = numJobs;
+    ctx->lastJobID = -1; /* intentional underflow */
     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
+    DISPLAY("jobs %p\n", ctx->jobs);
     {
         unsigned u;
         for (u=0; u<numJobs; u++) {
@@ -115,10 +118,12 @@ static int freeCCtx(adaptCCtx* ctx)
         int const completedCondError = pthread_cond_destroy(&ctx->jobCompleted_cond);
         int const readyMutexError = pthread_mutex_destroy(&ctx->jobReady_mutex);
         int const readyCondError = pthread_cond_destroy(&ctx->jobReady_cond);
+        int const allJobsMutexError = pthread_mutex_destroy(&ctx->allJobsCompleted_mutex);
+        int const allJobsCondError = pthread_cond_destroy(&ctx->allJobsCompleted_cond);
         int const fileError =  fclose(ctx->dstFile);
         freeCompressionJobs(ctx);
         free(ctx->jobs);
-        return completedMutexError | completedCondError | readyMutexError | readyCondError | fileError;
+        return completedMutexError | completedCondError | readyMutexError | readyCondError | fileError | allJobsMutexError | allJobsCondError;
     }
 }
 
@@ -131,10 +136,10 @@ static void* compressionThread(void* arg)
         jobDescription* job = &ctx->jobs[currJob];
         pthread_mutex_lock(job->jobReady_mutex);
         while(job->jobReady == 0) {
+            DISPLAY("waiting\n");
             pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex);
         }
         pthread_mutex_unlock(job->jobReady_mutex);
-
         /* compress the data */
         {
             size_t const compressedSize = ZSTD_compress(job->dst.start, job->dst.size, job->src.start, job->src.size, job->compressionLevel);
@@ -145,8 +150,12 @@ static void* compressionThread(void* arg)
             }
             job->compressedSize = compressedSize;
         }
+        pthread_mutex_lock(job->jobCompleted_mutex);
+        job->jobCompleted = 1;
+        pthread_cond_signal(job->jobCompleted_cond);
+        pthread_mutex_unlock(job->jobCompleted_mutex);
         currJob++;
-        if (currJob >= ctx->numJobs || ctx->threadError) {
+        if (currJob >= ctx->lastJobID || ctx->threadError) {
             /* finished compressing all jobs */
             break;
         }
@@ -158,7 +167,6 @@ static void* outputThread(void* arg)
 {
     DISPLAY("started output thread\n");
     adaptCCtx* ctx = (adaptCCtx*)arg;
-    DISPLAY("casted ctx\n");
 
     unsigned currJob = 0;
     for ( ; ; ) {
@@ -183,7 +191,7 @@ static void* outputThread(void* arg)
             }
         }
         currJob++;
-        if (currJob >= ctx->numJobs || ctx->threadError) {
+        if (currJob >= ctx->lastJobID || ctx->threadError) {
             /* finished with all jobs */
             pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
             ctx->allJobsCompleted = 1;
@@ -220,29 +228,30 @@ static size_t getFileSize(const char* const filename)
 static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
 {
     unsigned const nextJob = ctx->nextJobID;
-    jobDescription job = ctx->jobs[nextJob];
-    job.compressionLevel = ctx->compressionLevel;
-    job.src.start = malloc(srcSize);
-    job.src.size = srcSize;
-    job.dst.size = ZSTD_compressBound(srcSize);
-    job.dst.start = malloc(job.dst.size);
-    job.jobCompleted = 0;
-    job.jobCompleted_cond = &ctx->jobCompleted_cond;
-    job.jobCompleted_mutex = &ctx->jobCompleted_mutex;
-    job.jobReady_cond = &ctx->jobReady_cond;
-    job.jobReady_mutex = &ctx->jobReady_mutex;
-    job.jobID = nextJob;
-    if (!job.src.start || !job.dst.start) {
+    jobDescription* job = &ctx->jobs[nextJob];
+    job->compressionLevel = ctx->compressionLevel;
+    job->src.start = malloc(srcSize);
+    job->src.size = srcSize;
+    job->dst.size = ZSTD_compressBound(srcSize);
+    job->dst.start = malloc(job->dst.size);
+    job->jobCompleted = 0;
+    job->jobCompleted_cond = &ctx->jobCompleted_cond;
+    job->jobCompleted_mutex = &ctx->jobCompleted_mutex;
+    job->jobReady_cond = &ctx->jobReady_cond;
+    job->jobReady_mutex = &ctx->jobReady_mutex;
+    job->jobID = nextJob;
+    if (!job->src.start || !job->dst.start) {
         /* problem occurred, free things then return */
-        if (job.src.start) free(job.src.start);
-        if (job.dst.start) free(job.dst.start);
+        DISPLAY("Error: problem occurred during job creation\n");
+        if (job->src.start) free(job->src.start);
+        if (job->dst.start) free(job->dst.start);
         return 1;
     }
-    memcpy(job.src.start, data, srcSize);
-    pthread_mutex_lock(job.jobReady_mutex);
-    job.jobReady = 1;
-    pthread_cond_signal(job.jobReady_cond);
-    pthread_mutex_unlock(job.jobReady_mutex);
+    memcpy(job->src.start, data, srcSize);
+    pthread_mutex_lock(job->jobReady_mutex);
+    job->jobReady = 1;
+    pthread_cond_signal(job->jobReady_cond);
+    pthread_mutex_unlock(job->jobReady_mutex);
     ctx->nextJobID++;
     return 0;
 }
@@ -305,14 +314,12 @@ int main(int argCount, const char* argv[])
 
     /* creating jobs */
     for ( ; ; ) {
-        DISPLAY("in job creation loop\n");
         size_t const readSize = fread(src, 1, FILE_CHUNK_SIZE, srcFile);
         if (readSize != FILE_CHUNK_SIZE && !feof(srcFile)) {
             DISPLAY("Error: problem occurred during read from src file\n");
             ret = 1;
             goto cleanup;
         }
-        DISPLAY("reading was fine\n");
         /* reading was fine, now create the compression job */
         {
             int const error = createCompressionJob(ctx, src, readSize);
@@ -321,9 +328,12 @@ int main(int argCount, const char* argv[])
                 goto cleanup;
             }
         }
-        if (feof(srcFile)) break;
+        if (feof(srcFile)) {
+            ctx->lastJobID = ctx->nextJobID;
+            break;
+        }
     }
-    
+
 cleanup:
     /* file compression completed */
     ret  |= (srcFile != NULL) ? fclose(srcFile) : 0;