From: Paul Cruz Date: Fri, 21 Jul 2017 16:26:35 +0000 (-0700) Subject: added priority decision making for adapt compression level X-Git-Tag: v1.3.1^2~13^2^2~33 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=e929d3b787b4a75b14546aa5f3e2c2e601eb2299;p=thirdparty%2Fzstd.git added priority decision making for adapt compression level --- diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index f25d22a4c..758bf5589 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -308,43 +308,42 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx) */ static void adaptCompressionLevel(adaptCCtx* ctx) { - /* check if compression is too slow */ - unsigned createChange; - unsigned writeChange; - unsigned compressionChange; + double createCompletion, compressionCompletion, writeCompletion; + double const threshold = 0.00001; pthread_mutex_lock(&ctx->completion_mutex.pMutex); - createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; - writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; - compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; - DEBUG(2, "compression level %u\n", ctx->compressionLevel); - DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured); - DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured); - DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured); + createCompletion = ctx->createCompletionMeasured; + compressionCompletion = ctx->compressionCompletionMeasured; + writeCompletion = ctx->writeCompletionMeasured; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - { - unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel); - - DEBUG(2, "compressionFastChange %u\n", compressionFastChange); - - if (compressionFastChange) { - DEBUG(2, "compression level too low\n"); - ctx->compressionLevel += compressionFastChange; - } - else { - unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1); - DEBUG(2, "compression level too high\n"); - ctx->compressionLevel -= compressionSlowChange; - } + DEBUG(2, "create completion: %f\n", createCompletion); + DEBUG(2, "compression completion: %f\n", compressionCompletion); + DEBUG(2, "write completion: %f\n", writeCompletion); + /* adapt compression based on bottleneck */ + if (1 - createCompletion > threshold) { + /* job creation was not finished, compression thread waited */ + unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - createCompletion * MAX_COMPRESSION_LEVEL_CHANGE; + DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change); + ctx->compressionLevel += change; + } + else if (1 - writeCompletion > threshold) { + /* write thread was not finished, compression thread waited */ + unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - writeCompletion * MAX_COMPRESSION_LEVEL_CHANGE; + DEBUG(2, "increasing compression level %u by %u\n", ctx->compressionLevel, change); + ctx->compressionLevel += change; + } + else if (1 - compressionCompletion > threshold) { + /* compression thread was not finished, one of the other two threads waited */ + unsigned const change = MAX_COMPRESSION_LEVEL_CHANGE - compressionCompletion * MAX_COMPRESSION_LEVEL_CHANGE; + DEBUG(2, "decreasing compression level %u by %u\n", ctx->compressionLevel, change); + ctx->compressionLevel -= change; } - /* reset */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); ctx->createCompletionMeasured = 1; ctx->compressionCompletionMeasured = 1; ctx->writeCompletionMeasured = 1; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - DEBUG(2, "\n"); if (g_forceCompressionLevel) { ctx->compressionLevel = g_compressionLevel; @@ -375,7 +374,7 @@ static void* compressionThread(void* arg) /* compression thread is waiting, take measurements of write completion and read completion */ ctx->createCompletionMeasured = ctx->createCompletion; ctx->writeCompletionMeasured = ctx->writeCompletion; - DEBUG(2, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured); + DEBUG(3, "compression thread waiting : createCompletionMeasured %f : writeCompletionMeasured %f\n", ctx->createCompletionMeasured, ctx->writeCompletionMeasured); DEBUG(3, "create completion: %f\n", ctx->createCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job ready, nextJob: %u\n", currJob); @@ -516,7 +515,7 @@ static void* outputThread(void* arg) pthread_mutex_lock(&ctx->completion_mutex.pMutex); /* write thread is waiting, take measurement of compression completion */ ctx->compressionCompletionMeasured = ctx->compressionCompletion; - DEBUG(2, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured); + DEBUG(3, "write thread waiting : compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); @@ -598,7 +597,7 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) pthread_mutex_lock(&ctx->completion_mutex.pMutex); /* creation thread is waiting, take measurement of compression completion */ ctx->compressionCompletionMeasured = ctx->compressionCompletion; - DEBUG(2, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured); + DEBUG(3, "creation thread waiting : compression completion measured : %f\n", ctx->compressionCompletionMeasured); DEBUG(3, "writeCompletion: %f\n", ctx->writeCompletion); pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);