From: Paul Cruz Date: Tue, 18 Jul 2017 20:30:29 +0000 (-0700) Subject: rename completion variable, split up fwrite operations in order to track progress X-Git-Tag: v1.3.1^2~13^2^2~54 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=29c36cf051545119fddfa898e8a03ce06e0380ef;p=thirdparty%2Fzstd.git rename completion variable, split up fwrite operations in order to track progress --- diff --git a/contrib/adaptive-compression/adapt.c b/contrib/adaptive-compression/adapt.c index 62f5ec912..404db6d94 100644 --- a/contrib/adaptive-compression/adapt.c +++ b/contrib/adaptive-compression/adapt.c @@ -87,8 +87,8 @@ typedef struct { unsigned jobWriteID; unsigned allJobsCompleted; unsigned adaptParam; - unsigned completionMeasured; - double completion; + unsigned compressionCompletionMeasured; + double compressionCompletion; mutex_t jobCompressed_mutex; cond_t jobCompressed_cond; mutex_t jobReady_mutex; @@ -319,7 +319,7 @@ static void adaptCompressionLevel(adaptCCtx* ctx) reset = 1; } else if (compressSlow && ctx->compressionLevel > 1) { - double const completion = ctx->completion; + double const completion = ctx->compressionCompletion; unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1; unsigned const change = MIN(maxChange, ctx->compressionLevel - 1); DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel); @@ -331,8 +331,8 @@ static void adaptCompressionLevel(adaptCCtx* ctx) ctx->stats.readyCounter = 0; ctx->stats.writeCounter = 0; ctx->stats.compressedCounter = 0; - ctx->completion = 1; - ctx->completionMeasured = 0; + ctx->compressionCompletion = 1; + ctx->compressionCompletionMeasured = 0; } } } @@ -455,12 +455,12 @@ static void* outputThread(void* arg) ctx->stats.waitCompressed++; ctx->stats.compressedCounter++; reduceCounters(ctx); - if (!ctx->completionMeasured) { - ctx->completion = ZSTD_getCompletion(ctx->cctx); - ctx->completionMeasured = 1; + if (!ctx->compressionCompletionMeasured) { + ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx); + ctx->compressionCompletionMeasured = 1; } adaptCompressionLevel(ctx); - DEBUG(3, "output detected completion: %f\n", ctx->completion); + DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion); DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob); pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex); } @@ -468,14 +468,25 @@ static void* outputThread(void* arg) DEBUG(3, "outputThread(): continuing after job compressed\n"); { size_t const compressedSize = job->compressedSize; + size_t remaining = compressedSize; if (ZSTD_isError(compressedSize)) { DISPLAY("Error: an error occurred during compression\n"); signalErrorToThreads(ctx); return arg; } { - size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile); - if (writeSize != compressedSize) { + // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile); + size_t const blockSize = 4 << 20; + size_t pos = 0; + for ( ; ; ) { + size_t const writeSize = MIN(remaining, blockSize); + size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile); + if (ret != writeSize) break; + pos += ret; + remaining -= ret; + if (remaining == 0) break; + } + if (pos != compressedSize) { DISPLAY("Error: an error occurred during file write operation\n"); signalErrorToThreads(ctx); return arg; @@ -517,12 +528,12 @@ static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last) ctx->stats.waitWrite++; ctx->stats.writeCounter++; reduceCounters(ctx); - if (!ctx->completionMeasured) { - ctx->completion = ZSTD_getCompletion(ctx->cctx); - ctx->completionMeasured = 1; + if (!ctx->compressionCompletion) { + ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx); + ctx->compressionCompletionMeasured = 1; } adaptCompressionLevel(ctx); - DEBUG(3, "job creation detected completion %f\n", ctx->completion); + DEBUG(3, "job creation detected completion %f\n", ctx->compressionCompletion); DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob); pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex); }