From: Paul Cruz Date: Wed, 26 Jul 2017 17:05:10 +0000 (-0700) Subject: decrease completion requirements for change, move create thread wait, merge cases... X-Git-Tag: v1.3.1^2~13^2^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=be92a38d6a76ed987c44a4a364161190a8715b36;p=thirdparty%2Fzstd.git decrease completion requirements for change, move create thread wait, merge cases where compression thread should wait --- diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 16701e75d..1633b6bf8 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -322,10 +322,10 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx) /* map completion percentages to values for changing compression level */ static unsigned convertCompletionToChange(double completion) { - if (completion < 0.05) { + if (completion < 0.1) { return 2; } - else if (completion < 0.5) { + else if (completion < 0.65) { return 1; } else { @@ -396,9 +396,9 @@ static void adaptCompressionLevel(adaptCCtx* ctx) DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange); } - else if (1-compressWaitWriteCompletion > threshold) { + else if (1-compressWaitWriteCompletion > threshold || 1-compressWaitCreateCompletion > threshold) { /* compress waiting on write */ - double const completion = compressWaitWriteCompletion; + double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion); unsigned const change = convertCompletionToChange(completion); unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); if (ctx->convergenceCounter > CONVERGENCE_LOWER_BOUND && boundChange != 0) { @@ -406,26 +406,11 @@ static void adaptCompressionLevel(adaptCCtx* ctx) } else if (boundChange != 0) { ctx->compressionLevel += boundChange; + ctx->cooldown = 0; ctx->convergenceCounter = 1; } - DEBUG(2, "compress waiting on write, tried to increase compression level by %u\n\n", boundChange); - } - else if (1-compressWaitCreateCompletion > threshold) { - /* compress waiting on create*/ - /* use compressWaitCreateCompletion */ - double const completion = compressWaitCreateCompletion; - unsigned const change = convertCompletionToChange(completion); - unsigned const boundChange = MIN(change, ZSTD_maxCLevel() - ctx->compressionLevel); - if (ctx->convergenceCounter > CONVERGENCE_LOWER_BOUND && boundChange != 0) { - ctx->convergenceCounter = 0; - } - else if (boundChange != 0) { - ctx->compressionLevel += boundChange; - ctx->convergenceCounter = 1; - } - - DEBUG(2, "compression waiting on create, tried to increase compression level by %u\n\n", boundChange); + DEBUG(2, "compress waiting on write or create, tried to increase compression level by %u\n\n", boundChange); } if (ctx->compressionLevel == prevCompressionLevel) { @@ -689,17 +674,6 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) jobDescription* const job = &ctx->jobs[nextJobIndex]; - /* wait until the job has been compressed */ - pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); - while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { - pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); - /* creation thread is waiting, take measurement of completion */ - ctx->createWaitCompressionCompletion = ctx->compressionCompletion; - DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion); - pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); - pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); - } - pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); /* reset create completion */ pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex); ctx->createCompletion = 0; @@ -767,8 +741,22 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA size_t pos = 0; size_t const readBlockSize = 1 << 15; size_t remaining = FILE_CHUNK_SIZE; - + unsigned const nextJob = ctx->nextJobID; DEBUG(2, "starting creation of job %u\n", currJob); + + + /* wait until the job has been compressed */ + pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); + while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) { + pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex); + /* creation thread is waiting, take measurement of completion */ + ctx->createWaitCompressionCompletion = ctx->compressionCompletion; + DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion); + pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex); + pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); + } + pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); + while (remaining != 0 && !feof(srcFile)) { size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile); if (ret != readBlockSize && !feof(srcFile)) {