From: Paul Cruz Date: Thu, 6 Jul 2017 23:06:53 +0000 (-0700) Subject: added some basic logic for altering compression level X-Git-Tag: v1.3.1^2~13^2^2~108 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=ff9f2cd057041543851489c1ce8cc179efc2dacc;p=thirdparty%2Fzstd.git added some basic logic for altering compression level --- diff --git a/contrib/adaptive-compression/multi.c b/contrib/adaptive-compression/multi.c index 6aa60315d..2b2aacabf 100644 --- a/contrib/adaptive-compression/multi.c +++ b/contrib/adaptive-compression/multi.c @@ -1,12 +1,13 @@ #define DISPLAY(...) fprintf(stderr, __VA_ARGS__) #define DEBUGLOG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } } #define FILE_CHUNK_SIZE 4 << 20 -#define MAX_NUM_JOBS 100; +#define MAX_NUM_JOBS 2; #define stdinmark "/*stdin*\\" #define stdoutmark "/*stdout*\\" #define MAX_PATH 256 #define DEFAULT_DISPLAY_LEVEL 1 #define DEFAULT_COMPRESSION_LEVEL 6 +#define DEFAULT_ADAPT_PARAM 2 typedef unsigned char BYTE; #include /* fprintf */ @@ -27,7 +28,10 @@ typedef struct { typedef struct { unsigned waitCompleted; unsigned waitReady; - unsigned waitWritten; + unsigned waitWrite; + unsigned readyCounter; + unsigned completedCounter; + unsigned writeCounter; } stat_t; typedef struct { @@ -47,8 +51,9 @@ typedef struct { unsigned threadError; unsigned jobReadyID; unsigned jobCompletedID; - unsigned jobWrittenID; + unsigned jobWriteID; unsigned allJobsCompleted; + unsigned adaptParam; pthread_mutex_t jobCompleted_mutex; pthread_cond_t jobCompleted_cond; pthread_mutex_t jobReady_mutex; @@ -113,12 +118,13 @@ static adaptCCtx* createCCtx(unsigned numJobs, const char* const outFilename) ctx->numJobs = numJobs; ctx->jobReadyID = 0; ctx->jobCompletedID = 0; - ctx->jobWrittenID = 0; + ctx->jobWriteID = 0; ctx->lastJobID = -1; /* intentional underflow */ ctx->jobs = calloc(1, numJobs*sizeof(jobDescription)); ctx->nextJobID = 0; ctx->threadError = 0; ctx->allJobsCompleted = 0; + ctx->adaptParam = DEFAULT_ADAPT_PARAM; if (!ctx->jobs) { DISPLAY("Error: could not allocate space for jobs during context creation\n"); freeCCtx(ctx); @@ -148,6 +154,41 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx) pthread_mutex_unlock(&ctx->allJobsCompleted_mutex); } +static unsigned adaptCompressionLevel(adaptCCtx* ctx) +{ + unsigned reset = 0; + unsigned const allSlow = ctx->adaptParam < ctx->stats.completedCounter && ctx->adaptParam < ctx->stats.writeCounter && ctx->adaptParam < ctx->stats.readyCounter ? 1 : 0; + unsigned const compressWaiting = ctx->adaptParam < ctx->stats.readyCounter ? 1 : 0; + unsigned const writeWaiting = ctx->adaptParam < ctx->stats.completedCounter ? 1 : 0; + unsigned const createWaiting = ctx->adaptParam < ctx->stats.writeCounter ? 1 : 0; + unsigned const writeSlow = ((compressWaiting && createWaiting) || (createWaiting && !writeWaiting)) ? 1 : 0; + unsigned const compressSlow = ((writeWaiting && createWaiting) || (writeWaiting && !compressWaiting)) ? 1 : 0; + unsigned const createSlow = ((compressWaiting && writeWaiting) || (compressWaiting && !createWaiting)) ? 1 : 0; + // unsigned const writeSlow = ((compressWaiting && createWaiting)) ? 1 : 0; + // unsigned const compressSlow = ((writeWaiting && createWaiting)) ? 1 : 0; + // unsigned const createSlow = ((compressWaiting && writeWaiting)) ? 1 : 0; + DEBUGLOG(2, "ready: %u completed: %u write: %u\n", ctx->stats.readyCounter, ctx->stats.completedCounter, ctx->stats.writeCounter); + if (allSlow) { + reset = 1; + } + else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) { + DEBUGLOG(2, "increasing compression level %u\n", ctx->compressionLevel); + ctx->compressionLevel++; + reset = 1; + } + else if (compressSlow && ctx->compressionLevel > 1) { + DEBUGLOG(2, "decreasing compression level %u\n", ctx->compressionLevel); + ctx->compressionLevel--; + reset = 1; + } + if (reset) { + ctx->stats.readyCounter = 0; + ctx->stats.writeCounter = 0; + ctx->stats.completedCounter = 0; + } + return ctx->compressionLevel; +} + static void* compressionThread(void* arg) { adaptCCtx* ctx = (adaptCCtx*)arg; @@ -159,6 +200,7 @@ static void* compressionThread(void* arg) pthread_mutex_lock(&ctx->jobReady_mutex); while(currJob + 1 > ctx->jobReadyID) { ctx->stats.waitReady++; + ctx->stats.readyCounter++; DEBUGLOG(2, "waiting on job ready, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobReady_cond, &ctx->jobReady_mutex); } @@ -166,7 +208,10 @@ static void* compressionThread(void* arg) // DEBUGLOG(2, "compressionThread(): continuing after job ready\n"); /* compress the data */ { - size_t const compressedSize = ZSTD_compress(job->dst.start, job->dst.size, job->src.start, job->src.size, job->compressionLevel); + unsigned const cLevel = adaptCompressionLevel(ctx); + // unsigned const cLevel = job->compressionLevel; + DEBUGLOG(2, "cLevel used: %u\n", cLevel); + size_t const compressedSize = ZSTD_compress(job->dst.start, job->dst.size, job->src.start, job->src.size, cLevel); if (ZSTD_isError(compressedSize)) { ctx->threadError = 1; DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(compressedSize)); @@ -202,6 +247,7 @@ static void* outputThread(void* arg) pthread_mutex_lock(&ctx->jobCompleted_mutex); while (currJob + 1 > ctx->jobCompletedID) { ctx->stats.waitCompleted++; + ctx->stats.completedCounter++; DEBUGLOG(2, "waiting on job completed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompleted_cond, &ctx->jobCompleted_mutex); } @@ -225,7 +271,7 @@ static void* outputThread(void* arg) currJob++; DEBUGLOG(2, "locking job write mutex\n"); pthread_mutex_lock(&ctx->jobWrite_mutex); - ctx->jobWrittenID++; + ctx->jobWriteID++; pthread_cond_signal(&ctx->jobWrite_cond); pthread_mutex_unlock(&ctx->jobWrite_mutex); DEBUGLOG(2, "unlocking job write mutex\n"); @@ -251,14 +297,17 @@ static int createCompressionJob(adaptCCtx* ctx, BYTE* data, size_t srcSize) jobDescription* job = &ctx->jobs[nextJobIndex]; // DEBUGLOG(2, "createCompressionJob(): wait for job write\n"); pthread_mutex_lock(&ctx->jobWrite_mutex); - // DEBUGLOG(2, "Creating new compression job -- nextJob: %u, jobCompletedID: %u, jobWrittenID: %u, numJObs: %u\n", nextJob,ctx->jobCompletedID, ctx->jobWrittenID, ctx->numJobs); - while (nextJob - ctx->jobWrittenID >= ctx->numJobs) { - ctx->stats.waitWritten++; - DEBUGLOG(2, "waiting on job written, nextJob: %u\n", nextJob); + // DEBUGLOG(2, "Creating new compression job -- nextJob: %u, jobCompletedID: %u, jobWriteID: %u, numJObs: %u\n", nextJob,ctx->jobCompletedID, ctx->jobWriteID, ctx->numJobs); + while (nextJob - ctx->jobWriteID >= ctx->numJobs) { + ctx->stats.waitWrite++; + ctx->stats.writeCounter++; + DEBUGLOG(2, "waiting on job Write, nextJob: %u\n", nextJob); pthread_cond_wait(&ctx->jobWrite_cond, &ctx->jobWrite_mutex); } pthread_mutex_unlock(&ctx->jobWrite_mutex); // DEBUGLOG(2, "createCompressionJob(): continuing after job write\n"); + + job->compressionLevel = ctx->compressionLevel; job->src.start = malloc(srcSize); job->src.size = srcSize; @@ -287,7 +336,7 @@ static void printStats(stat_t stats) DISPLAY("========STATISTICS========\n"); DISPLAY("# times waited on job ready: %u\n", stats.waitReady); DISPLAY("# times waited on job completed: %u\n", stats.waitCompleted); - DISPLAY("# times waited on job written: %u\n\n", stats.waitWritten); + DISPLAY("# times waited on job Write: %u\n\n", stats.waitWrite); } static int compressFilename(const char* const srcFilename, const char* const dstFilename)