]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added print statements and debuglog
authorPaul Cruz <paulcruz74@fb.com>
Wed, 5 Jul 2017 23:54:34 +0000 (16:54 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Wed, 5 Jul 2017 23:54:34 +0000 (16:54 -0700)
contrib/adaptive-compression/multi.c
contrib/adaptive-compression/pipetests.sh [new file with mode: 0755]

index 089dfe5a3f83bfedf8a436bfb941096b8f7d16e1..40045381f8da08a3bba12c4a2c18cb4b9b92b8d1 100644 (file)
@@ -1,9 +1,11 @@
 #define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
+#define DEBUGLOG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
 #define FILE_CHUNK_SIZE 4 << 20
 #define MAX_NUM_JOBS 50;
 #define stdinmark  "/*stdin*\\"
 #define stdoutmark "/*stdout*\\"
 #define MAX_PATH 256
+#define DEFAULT_DISPLAY_LEVEL 1
 typedef unsigned char BYTE;
 
 #include <stdio.h>      /* fprintf */
@@ -12,7 +14,7 @@ typedef unsigned char BYTE;
 #include <string.h>     /* memset */
 #include "zstd.h"
 
-
+static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
 
 typedef struct {
     void* start;
@@ -158,11 +160,13 @@ static void* compressionThread(void* arg)
     for ( ; ; ) {
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
+        // DEBUGLOG(2, "compressionThread(): waiting on job ready\n");
         pthread_mutex_lock(job->jobReady_mutex);
         while(job->jobReady == 0) {
             pthread_cond_wait(job->jobReady_cond, job->jobReady_mutex);
         }
         pthread_mutex_unlock(job->jobReady_mutex);
+        // DEBUGLOG(2, "compressionThread(): continuing after job ready\n");
         /* compress the data */
         {
             size_t const compressedSize = ZSTD_compress(job->dst.start, job->dst.size, job->src.start, job->src.size, job->compressionLevel);
@@ -175,11 +179,13 @@ static void* compressionThread(void* arg)
         }
         pthread_mutex_lock(job->jobCompleted_mutex);
         job->jobCompleted = 1;
+        DEBUGLOG(2, "signaling for job %u\n", currJob);
         pthread_cond_signal(job->jobCompleted_cond);
         pthread_mutex_unlock(job->jobCompleted_mutex);
         currJob++;
         if (currJob >= ctx->lastJobID || ctx->threadError) {
             /* finished compressing all jobs */
+            DEBUGLOG(2, "all jobs finished compressing\n");
             break;
         }
     }
@@ -194,11 +200,14 @@ static void* outputThread(void* arg)
     for ( ; ; ) {
         unsigned const currJobIndex = currJob % ctx->numJobs;
         jobDescription* job = &ctx->jobs[currJobIndex];
+        DEBUGLOG(2, "outputThread(): waiting on job completed\n");
         pthread_mutex_lock(job->jobCompleted_mutex);
         while (job->jobCompleted == 0) {
+            DEBUGLOG(2, "inside job completed wait loop waiting on %u\n", currJob);
             pthread_cond_wait(job->jobCompleted_cond, job->jobCompleted_mutex);
         }
         pthread_mutex_unlock(job->jobCompleted_mutex);
+        DEBUGLOG(2, "outputThread(): continuing after job completed\n");
         {
             size_t const compressedSize = job->compressedSize;
             if (ZSTD_isError(compressedSize)) {
@@ -214,12 +223,17 @@ static void* outputThread(void* arg)
             }
         }
         currJob++;
+        DEBUGLOG(2, "locking job write mutex\n");
         pthread_mutex_lock(job->jobWrite_mutex);
         job->jobWritten = 1;
         pthread_cond_signal(job->jobWrite_cond);
         pthread_mutex_unlock(job->jobWrite_mutex);
+        DEBUGLOG(2, "unlocking job write mutex\n");
+
+        DEBUGLOG(2, "checking if done: %u/%u\n", currJob, ctx->lastJobID);
         if (currJob >= ctx->lastJobID || ctx->threadError) {
             /* finished with all jobs */
+            DEBUGLOG(2, "all jobs finished writing\n");
             pthread_mutex_lock(&ctx->allJobsCompleted_mutex);
             ctx->allJobsCompleted = 1;
             pthread_cond_signal(&ctx->allJobsCompleted_cond);
@@ -235,11 +249,13 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
     unsigned const nextJob = ctx->nextJobID;
     unsigned const nextJobIndex = nextJob % ctx->numJobs;
     jobDescription* job = &ctx->jobs[nextJobIndex];
+    // DEBUGLOG(2, "createCompressionJob(): wait for job write\n");
     pthread_mutex_lock(job->jobWrite_mutex);
     while (job->jobWritten == 0) {
         pthread_cond_wait(job->jobWrite_cond, job->jobWrite_mutex);
     }
     pthread_mutex_unlock(job->jobWrite_mutex);
+    // DEBUGLOG(2, "createCompressionJob(): continuing after job write\n");
     job->compressionLevel = ctx->compressionLevel;
     job->src.start = malloc(srcSize);
     job->src.size = srcSize;
@@ -329,6 +345,7 @@ static int compressFilename(const char* const srcFilename, const char* const dst
             }
         }
         if (feof(srcFile)) {
+            DEBUGLOG(2, "THE STREAM OF DATA ENDED %u\n", ctx->nextJobID);
             ctx->lastJobID = ctx->nextJobID;
             break;
         }
@@ -384,6 +401,10 @@ int main(int argCount, const char* argv[])
                 outFilename = argument;
                 continue;
             }
+            else if (strlen(argument) > 1 && argument[1] == 'v') {
+                g_displayLevel++;
+                continue;
+            }
             else {
                 DISPLAY("Error: invalid argument provided\n");
                 ret = 1;
diff --git a/contrib/adaptive-compression/pipetests.sh b/contrib/adaptive-compression/pipetests.sh
new file mode 100755 (executable)
index 0000000..2924cd5
--- /dev/null
@@ -0,0 +1,2 @@
+make clean multi
+pv -q -L 50m tests/test2048.pdf | ./multi -v -otmp.zst