unsigned jobWriteID;
unsigned allJobsCompleted;
unsigned adaptParam;
- unsigned completionMeasured;
- double completion;
+ unsigned compressionCompletionMeasured;
+ double compressionCompletion;
mutex_t jobCompressed_mutex;
cond_t jobCompressed_cond;
mutex_t jobReady_mutex;
reset = 1;
}
else if (compressSlow && ctx->compressionLevel > 1) {
- double const completion = ctx->completion;
+ double const completion = ctx->compressionCompletion;
unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
ctx->stats.readyCounter = 0;
ctx->stats.writeCounter = 0;
ctx->stats.compressedCounter = 0;
- ctx->completion = 1;
- ctx->completionMeasured = 0;
+ ctx->compressionCompletion = 1;
+ ctx->compressionCompletionMeasured = 0;
}
}
}
ctx->stats.waitCompressed++;
ctx->stats.compressedCounter++;
reduceCounters(ctx);
- if (!ctx->completionMeasured) {
- ctx->completion = ZSTD_getCompletion(ctx->cctx);
- ctx->completionMeasured = 1;
+ if (!ctx->compressionCompletionMeasured) {
+ ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
+ ctx->compressionCompletionMeasured = 1;
}
adaptCompressionLevel(ctx);
- DEBUG(3, "output detected completion: %f\n", ctx->completion);
+ DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
DEBUG(3, "outputThread(): continuing after job compressed\n");
{
size_t const compressedSize = job->compressedSize;
+ size_t remaining = compressedSize;
if (ZSTD_isError(compressedSize)) {
DISPLAY("Error: an error occurred during compression\n");
signalErrorToThreads(ctx);
return arg;
}
{
- size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
- if (writeSize != compressedSize) {
+ // size_t const writeSize = fwrite(job->dst.start, 1, compressedSize, dstFile);
+ size_t const blockSize = 4 << 20;
+ size_t pos = 0;
+ for ( ; ; ) {
+ size_t const writeSize = MIN(remaining, blockSize);
+ size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile);
+ if (ret != writeSize) break;
+ pos += ret;
+ remaining -= ret;
+ if (remaining == 0) break;
+ }
+ if (pos != compressedSize) {
DISPLAY("Error: an error occurred during file write operation\n");
signalErrorToThreads(ctx);
return arg;
ctx->stats.waitWrite++;
ctx->stats.writeCounter++;
reduceCounters(ctx);
- if (!ctx->completionMeasured) {
- ctx->completion = ZSTD_getCompletion(ctx->cctx);
- ctx->completionMeasured = 1;
+ if (!ctx->compressionCompletion) {
+ ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
+ ctx->compressionCompletionMeasured = 1;
}
adaptCompressionLevel(ctx);
- DEBUG(3, "job creation detected completion %f\n", ctx->completion);
+ DEBUG(3, "job creation detected completion %f\n", ctx->compressionCompletion);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
}