From: Paul Cruz Date: Fri, 21 Jul 2017 01:45:33 +0000 (-0700) Subject: semi working version that stabilizes X-Git-Tag: v1.3.1^2~13^2^2~34 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9259c7afa412af57e7f578bc8e1f2c5d3906949c;p=thirdparty%2Fzstd.git semi working version that stabilizes --- diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 3c6b7e904..f25d22a4c 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -308,46 +308,46 @@ static void waitUntilAllJobsCompleted(adaptCCtx* ctx) */ static void adaptCompressionLevel(adaptCCtx* ctx) { - if (g_forceCompressionLevel) { - ctx->compressionLevel = g_compressionLevel; - } - else { - DEBUG(2, "compression level %u\n", ctx->compressionLevel); - /* check if compression is too slow */ - unsigned createChange; - unsigned writeChange; - unsigned compressionChange; - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; - writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; - compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; - DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured); - DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured); - DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured); - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + /* check if compression is too slow */ + unsigned createChange; + unsigned writeChange; + unsigned compressionChange; + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + createChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->createCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; + writeChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->writeCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; + compressionChange = MAX_COMPRESSION_LEVEL_CHANGE - ctx->compressionCompletionMeasured * MAX_COMPRESSION_LEVEL_CHANGE; + DEBUG(2, "compression level %u\n", ctx->compressionLevel); + DEBUG(2, "createCompletionMeasured %f\n", ctx->createCompletionMeasured); + DEBUG(2, "compressionCompletionMeasured %f\n", ctx->compressionCompletionMeasured); + DEBUG(2, "writeCompletionMeasured %f\n", ctx->writeCompletionMeasured); + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - { - unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel); - DEBUG(2, "compressionFastChange %u\n", compressionFastChange); + { + unsigned const compressionFastChange = MIN(MIN(createChange, writeChange), ZSTD_maxCLevel() - ctx->compressionLevel); - if (compressionFastChange) { - DEBUG(2, "compression level too low\n"); - ctx->compressionLevel += compressionFastChange; - } - else { - unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1); - DEBUG(2, "compression level too high\n"); - ctx->compressionLevel -= compressionSlowChange; - } + DEBUG(2, "compressionFastChange %u\n", compressionFastChange); + + if (compressionFastChange) { + DEBUG(2, "compression level too low\n"); + ctx->compressionLevel += compressionFastChange; } + else { + unsigned const compressionSlowChange = MIN(compressionChange, ctx->compressionLevel-1); + DEBUG(2, "compression level too high\n"); + ctx->compressionLevel -= compressionSlowChange; + } + } - /* reset */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->createCompletionMeasured = 1; - ctx->compressionCompletionMeasured = 1; - ctx->writeCompletionMeasured = 1; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - DEBUG(2, "\n"); + /* reset */ + pthread_mutex_lock(&ctx->completion_mutex.pMutex); + ctx->createCompletionMeasured = 1; + ctx->compressionCompletionMeasured = 1; + ctx->writeCompletionMeasured = 1; + pthread_mutex_unlock(&ctx->completion_mutex.pMutex); + DEBUG(2, "\n"); + + if (g_forceCompressionLevel) { + ctx->compressionLevel = g_compressionLevel; } } @@ -368,10 +368,6 @@ static void* compressionThread(void* arg) jobDescription* job = &ctx->jobs[currJobIndex]; DEBUG(3, "compressionThread(): waiting on job ready\n"); - /* new job, reset completion */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->compressionCompletion = 0; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); pthread_mutex_lock(&ctx->jobReady_mutex.pMutex); while(currJob + 1 > ctx->jobReadyID && !ctx->threadError) { @@ -387,9 +383,9 @@ static void* compressionThread(void* arg) } pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex); - /* reset create completion */ + /* reset compression completion */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->createCompletion = 0; + ctx->compressionCompletion = 0; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "compressionThread(): continuing after job ready\n"); @@ -397,7 +393,7 @@ static void* compressionThread(void* arg) DEBUG(3, "%.*s", (int)job->src.size, (char*)job->src.start); /* adapt compression level */ - adaptCompressionLevel(ctx); + if (currJob) adaptCompressionLevel(ctx); /* compress the data */ { @@ -515,11 +511,6 @@ static void* outputThread(void* arg) jobDescription* job = &ctx->jobs[currJobIndex]; DEBUG(3, "outputThread(): waiting on job compressed\n"); - /* new job, reset completion */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->writeCompletion = 0; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex); while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) { pthread_mutex_lock(&ctx->completion_mutex.pMutex); @@ -532,9 +523,9 @@ static void* outputThread(void* arg) } pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex); - /* reset compression completion */ + /* reset write completion */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->compressionCompletion = 0; + ctx->writeCompletion = 0; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "outputThread(): continuing after job compressed\n"); @@ -615,9 +606,9 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) } pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex); - /* reset write completion */ + /* reset create completion */ pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->writeCompletion = 0; + ctx->createCompletion = 0; pthread_mutex_unlock(&ctx->completion_mutex.pMutex); DEBUG(3, "createCompressionJob(): continuing after job write\n"); @@ -688,11 +679,6 @@ static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadA size_t const readBlockSize = 1 << 15; size_t remaining = FILE_CHUNK_SIZE; - /* new job reset completion */ - pthread_mutex_lock(&ctx->completion_mutex.pMutex); - ctx->createCompletion = 0; - pthread_mutex_unlock(&ctx->completion_mutex.pMutex); - while (remaining != 0 && !feof(srcFile)) { size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile); if (ret != readBlockSize && !feof(srcFile)) {