unsigned allJobsCompleted;
unsigned adaptParam;
unsigned compressionCompletionMeasured;
+ unsigned writeCompletionMeasured;
double compressionCompletion;
+ double writeCompletion;
mutex_t jobCompressed_mutex;
cond_t jobCompressed_cond;
mutex_t jobReady_mutex;
cond_t allJobsCompleted_cond;
mutex_t jobWrite_mutex;
cond_t jobWrite_cond;
+ mutex_t completion_mutex;
+ mutex_t stats_mutex;
size_t lastDictSize;
inBuff_t input;
cStat_t stats;
error |= destroyCond(&ctx->allJobsCompleted_cond);
error |= destroyMutex(&ctx->jobWrite_mutex);
error |= destroyCond(&ctx->jobWrite_cond);
+ error |= destroyMutex(&ctx->completion_mutex);
+ error |= destroyMutex(&ctx->stats_mutex);
error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
free(ctx->input.buffer.start);
if (ctx->jobs){
pthreadError |= initCond(&ctx->allJobsCompleted_cond);
pthreadError |= initMutex(&ctx->jobWrite_mutex);
pthreadError |= initCond(&ctx->jobWrite_cond);
+ pthreadError |= initMutex(&ctx->completion_mutex);
+ pthreadError |= initMutex(&ctx->stats_mutex);
if (pthreadError) return NULL;
}
ctx->numJobs = numJobs;
}
else if ((writeSlow || createSlow) && ctx->compressionLevel < (unsigned)ZSTD_maxCLevel()) {
DEBUG(3, "increasing compression level %u\n", ctx->compressionLevel);
- ctx->compressionLevel++;
- reset = 1;
+ double completion;
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ completion = ctx->writeCompletion;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ {
+ unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE - 1)) + 1;
+ unsigned const change = writeSlow ? MIN(maxChange, ZSTD_maxCLevel() - ctx->compressionLevel) : 1;
+ DEBUG(2, "writeSlow: %u, change: %u\n", writeSlow, change);
+ DEBUG(2, "write completion: %f\n", completion);
+ ctx->compressionLevel += change;
+ reset = 1;
+ }
}
else if (compressSlow && ctx->compressionLevel > 1) {
- double const completion = ctx->compressionCompletion;
- unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
- unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
- DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
- DEBUG(3, "completion: %f\n", completion);
- ctx->compressionLevel -= change;
- reset = 1;
+ double completion;
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ completion = ctx->compressionCompletion;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+ {
+ unsigned const maxChange = (unsigned)((1-completion) * (MAX_COMPRESSION_LEVEL_CHANGE-1)) + 1;
+ unsigned const change = MIN(maxChange, ctx->compressionLevel - 1);
+ DEBUG(3, "decreasing compression level %u\n", ctx->compressionLevel);
+ DEBUG(3, "completion: %f\n", completion);
+ ctx->compressionLevel -= change;
+ reset = 1;
+ }
}
if (reset) {
ctx->stats.readyCounter = 0;
ctx->stats.writeCounter = 0;
ctx->stats.compressedCounter = 0;
+
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
ctx->compressionCompletion = 1;
ctx->compressionCompletionMeasured = 0;
+ ctx->writeCompletion = 1;
+ ctx->writeCompletionMeasured = 0;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
}
}
}
ctx->stats.waitCompressed++;
ctx->stats.compressedCounter++;
reduceCounters(ctx);
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
if (!ctx->compressionCompletionMeasured) {
ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
ctx->compressionCompletionMeasured = 1;
+ DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion);
}
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
adaptCompressionLevel(ctx);
- DEBUG(3, "output detected completion: %f\n", ctx->compressionCompletion);
DEBUG(3, "waiting on job compressed, nextJob: %u\n", currJob);
pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
}
if (ret != writeSize) break;
pos += ret;
remaining -= ret;
+
+ /* update completion variable for writing */
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ if (!ctx->writeCompletionMeasured) {
+ ctx->writeCompletion = 1 - (double)remaining/compressedSize;
+ }
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
+
if (remaining == 0) break;
}
if (pos != compressedSize) {
ctx->stats.waitWrite++;
ctx->stats.writeCounter++;
reduceCounters(ctx);
- if (!ctx->compressionCompletion) {
- ctx->compressionCompletion = ZSTD_getCompletion(ctx->cctx);
- ctx->compressionCompletionMeasured = 1;
- }
+ pthread_mutex_lock(&ctx->completion_mutex.pMutex);
+ ctx->writeCompletionMeasured = 1;
+ pthread_mutex_unlock(&ctx->completion_mutex.pMutex);
adaptCompressionLevel(ctx);
- DEBUG(3, "job creation detected completion %f\n", ctx->compressionCompletion);
DEBUG(3, "waiting on job Write, nextJob: %u\n", nextJob);
pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
}
ctx->input.buffer.start = copy;
}
job->dictSize = ctx->lastDictSize;
- pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
- ctx->jobReadyID++;
- pthread_cond_signal(&ctx->jobReady_cond.pCond);
- pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
DEBUG(3, "finished job creation %u\n", nextJob);
ctx->nextJobID++;
DEBUG(3, "filled: %zu, srcSize: %zu\n", ctx->input.filled, srcSize);
ctx->lastDictSize = srcSize;
ctx->input.filled = srcSize;
}
+
+ /* signal job ready */
+ pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
+ ctx->jobReadyID++;
+ pthread_cond_signal(&ctx->jobReady_cond.pCond);
+ pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
+
return 0;
}