]> git.ipfire.org Git - thirdparty/zstd.git/commitdiff
added some basic logic for altering compression level
authorPaul Cruz <paulcruz74@fb.com>
Thu, 6 Jul 2017 23:06:53 +0000 (16:06 -0700)
committerPaul Cruz <paulcruz74@fb.com>
Thu, 6 Jul 2017 23:06:53 +0000 (16:06 -0700)
contrib/adaptive-compression/multi.c

index 6aa60315d1e45f0dea60b0500baf715967828151..2b2aacabf5fb3de573eafa7cfe26da8139b1666a 100644 (file)
@@ -1,12 +1,13 @@
 #define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
 #define DEBUGLOG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
 #define FILE_CHUNK_SIZE 4 << 20
-#define MAX_NUM_JOBS 100;
+#define MAX_NUM_JOBS 2;
 #define stdinmark  "/*stdin*\\"
 #define stdoutmark "/*stdout*\\"
 #define MAX_PATH 256
 #define DEFAULT_DISPLAY_LEVEL 1
 #define DEFAULT_COMPRESSION_LEVEL 6
+#define DEFAULT_ADAPT_PARAM 2
 typedef unsigned char BYTE;
 
 #include <stdio.h>      /* fprintf */
@@ -27,7 +28,10 @@ typedef struct {
 typedef struct {
     unsigned waitCompleted;
     unsigned waitReady;
-    unsigned waitWritten;
+    unsigned waitWrite;
+    unsigned readyCounter;
+    unsigned completedCounter;
+    unsigned writeCounter;
 } stat_t;
 
 typedef struct {
@@ -47,8 +51,9 @@ typedef struct {
     unsigned threadError;
     unsigned jobReadyID;
     unsigned jobCompletedID;
-    unsigned jobWrittenID;
+    unsigned jobWriteID;
     unsigned allJobsCompleted;
+    unsigned adaptParam;
     pthread_mutex_t jobCompleted_mutex;
     pthread_cond_t jobCompleted_cond;
     pthread_mutex_t jobReady_mutex;
@@ -113,12 +118,13 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename)
     ctx->numJobs = numJobs;
     ctx->jobReadyID = 0;
     ctx->jobCompletedID = 0;
-    ctx->jobWrittenID = 0;
+    ctx->jobWriteID = 0;
     ctx->lastJobID = -1; /* intentional underflow */
     ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
     ctx->nextJobID = 0;
     ctx->threadError = 0;
     ctx->allJobsCompleted = 0;
+    ctx->adaptParam = DEFAULT_ADAPT_PARAM;
     if (!ctx->jobs) {
         DISPLAY("Error: could not allocate space for jobs during context creation\n");
         freeCCtx(ctx);
@@ -148,6 +154,41 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
     pthread_mutex_unlock(&ctx->allJobsCompleted_mutex);
 }
 
+static unsigned adaptCompressionLevel(adaptCCtx* ctx)
+{
+    unsigned reset = 0;
+    unsigned const allSlow = ctx->adaptParam < ctx->stats.completedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter ? 1 : 0;
+    unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter ? 1 : 0;
+    unsigned const writeWaiting = ctx->adaptParam < ctx->stats.completedCounter ? 1 : 0;
+    unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter ? 1 : 0;
+    unsigned const writeSlow = ((compressWaiting && createWaiting) || (createWaiting && !writeWaiting)) ? 1 : 0;
+    unsigned const compressSlow = ((writeWaiting && createWaiting) || (writeWaiting && !compressWaiting)) ? 1 : 0;
+    unsigned const createSlow = ((compressWaiting && writeWaiting) || (compressWaiting && !createWaiting)) ? 1 : 0;
+    // unsigned const writeSlow = ((compressWaiting && createWaiting)) ? 1 : 0;
+    // unsigned const compressSlow = ((writeWaiting && createWaiting)) ? 1 : 0;
+    // unsigned const createSlow = ((compressWaiting && writeWaiting)) ? 1 : 0;
+    DEBUGLOG(2, "ready: %u completed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.completedCounter, ctx->stats.writeCounter);
+    if (allSlow) {
+        reset = 1;
+    }
+    else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
+        DEBUGLOG(2, "increasing compression level %u\n", ctx->compressionLevel);
+        ctx->compressionLevel++;
+        reset = 1;
+    }
+    else if (compressSlow && ctx->compressionLevel > 1) {
+        DEBUGLOG(2, "decreasing compression level %u\n", ctx->compressionLevel);
+        ctx->compressionLevel--;
+        reset = 1;
+    }
+    if (reset) {
+        ctx->stats.readyCounter = 0;
+        ctx->stats.writeCounter = 0;
+        ctx->stats.completedCounter = 0;
+    }
+    return ctx->compressionLevel;
+}
+
 static void* compressionThread(void* arg)
 {
     adaptCCtx* ctx = (adaptCCtx*)arg;
@@ -159,6 +200,7 @@ static void* compressionThread(void* arg)
         pthread_mutex_lock(&ctx->jobReady_mutex);
         while(currJob + 1 > ctx->jobReadyID) {
             ctx->stats.waitReady++;
+            ctx->stats.readyCounter++;
             DEBUGLOG(2, "waiting on job ready, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex);
         }
@@ -166,7 +208,10 @@ static void* compressionThread(void* arg)
         // DEBUGLOG(2, "compressionThread(): continuing after job ready\n");
         /* compress the data */
         {
-            size_t const compressedSize = ZSTD_compress(job->dst.start, job->dst.size, job->src.start, job->src.size, job->compressionLevel);
+            unsigned const cLevel = adaptCompressionLevel(ctx);
+            // unsigned const cLevel = job->compressionLevel;
+            DEBUGLOG(2, "cLevel used: %u\n", cLevel);
+            size_t const compressedSize = ZSTD_compress(job->dst.start, job->dst.size, job->src.start, job->src.size, cLevel);
             if (ZSTD_isError(compressedSize)) {
                 ctx->threadError = 1;
                 DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(compressedSize));
@@ -202,6 +247,7 @@ static void* outputThread(void* arg)
         pthread_mutex_lock(&ctx->jobCompleted_mutex);
         while (currJob + 1 > ctx->jobCompletedID) {
             ctx->stats.waitCompleted++;
+            ctx->stats.completedCounter++;
             DEBUGLOG(2, "waiting on job completed, nextJob: %u\n", currJob);
             pthread_cond_wait(&ctx->jobCompleted_cond, &ctx->jobCompleted_mutex);
         }
@@ -225,7 +271,7 @@ static void* outputThread(void* arg)
         currJob++;
         DEBUGLOG(2, "locking job write mutex\n");
         pthread_mutex_lock(&ctx->jobWrite_mutex);
-        ctx->jobWrittenID++;
+        ctx->jobWriteID++;
         pthread_cond_signal(&ctx->jobWrite_cond);
         pthread_mutex_unlock(&ctx->jobWrite_mutex);
         DEBUGLOG(2, "unlocking job write mutex\n");
@@ -251,14 +297,17 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize)
     jobDescription* job = &ctx->jobs[nextJobIndex];
     // DEBUGLOG(2, "createCompressionJob(): wait for job write\n");
     pthread_mutex_lock(&ctx->jobWrite_mutex);
-    // DEBUGLOG(2, "Creating new compression job -- nextJob: %u, jobCompletedID: %u, jobWrittenID: %u, numJObs: %u\n", nextJob,ctx->jobCompletedID, ctx->jobWrittenID, ctx->numJobs);
-    while (nextJob - ctx->jobWrittenID >= ctx->numJobs) {
-        ctx->stats.waitWritten++;
-        DEBUGLOG(2, "waiting on job written, nextJob: %u\n", nextJob);
+    // DEBUGLOG(2, "Creating new compression job -- nextJob: %u, jobCompletedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompletedID, ctx->jobWriteID, ctx->numJobs);
+    while (nextJob - ctx->jobWriteID >= ctx->numJobs) {
+        ctx->stats.waitWrite++;
+        ctx->stats.writeCounter++;
+        DEBUGLOG(2, "waiting on job Write, nextJob: %u\n", nextJob);
         pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex);
     }
     pthread_mutex_unlock(&ctx->jobWrite_mutex);
     // DEBUGLOG(2, "createCompressionJob(): continuing after job write\n");
+
+
     job->compressionLevel = ctx->compressionLevel;
     job->src.start = malloc(srcSize);
     job->src.size = srcSize;
@@ -287,7 +336,7 @@ static void printStats(stat_t stats)
     DISPLAY("========STATISTICS========\n");
     DISPLAY("# times waited on job ready: %u\n", stats.waitReady);
     DISPLAY("# times waited on job completed: %u\n", stats.waitCompleted);
-    DISPLAY("# times waited on job written: %u\n\n", stats.waitWritten);
+    DISPLAY("# times waited on job Write: %u\n\n", stats.waitWrite);
 }
 
 static int compressFilename(const char* const srcFilename, const char* const dstFilename)